Skip to main content

lash_sqlite_store/
process_registry.rs

1//! SQLite-backed [`ProcessRegistry`] (`SqliteProcessRegistry`).
2//!
3//! Ported from the prior store, preserving the public async surface byte-for-byte
4//! (the type name keeps the `Sqlite` prefix so the path-rename swap keeps
5//! compiling). The prior implementation ran every op directly on `&rusqlite::Connection`
6//! with `.await`; here every DB body is a *synchronous* rusqlite closure handed
7//! to [`SqliteConnection::call`] (reads) or [`SqliteConnection::write_flow`]
8//! (read-then-write).
9//!
10//! ## Why `write_flow`, not `write`
11//!
12//! The registry's transactional methods produce a [`lash_core::PluginError`],
13//! not a `rusqlite::Error`. `SqliteConnection::write` rolls back only when the
14//! closure returns `Err(rusqlite::Error)`, so a logical `PluginError` (e.g. a
15//! registration-hash conflict) would otherwise *commit* the partial work. Each
16//! such method therefore runs its synchronous body returning
17//! `Result<T, PluginError>` and maps it to a [`TxOutcome`]: `Ok` ⇒
18//! `Commit(Ok(value))`, `Err` ⇒ `Rollback(Err(error))`. That preserves the
19//! prior behaviour of rolling back on every error while still carrying the
20//! `PluginError` back to the caller. The outer `rusqlite::Error` channel only
21//! carries genuine SQLite/connection failures, mapped via `process_sqlite_error`.
22//!
23//! The `*_conn` helpers are synchronous and take a `&rusqlite::Connection` so
24//! they compose inside either closure — including from within a `&Transaction`,
25//! which derefs to `&Connection`.
26
27use super::*;
28
29fn process_status_label(record: &ProcessRecord) -> &'static str {
30    record.status.label()
31}
32
33impl SqliteProcessRegistry {
34    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
35        let conn = SqliteConnection::open(path).await?;
36        ensure_process_schema(&conn).await?;
37        apply_pragmas(&conn, StoreBacking::File).await?;
38        Ok(Self {
39            conn,
40            notify: tokio::sync::Notify::new(),
41        })
42    }
43
44    pub async fn memory() -> tokio_rusqlite::Result<Self> {
45        let conn = SqliteConnection::open_in_memory().await?;
46        ensure_process_schema(&conn).await?;
47        apply_pragmas(&conn, StoreBacking::Memory).await?;
48        Ok(Self {
49            conn,
50            notify: tokio::sync::Notify::new(),
51        })
52    }
53
54    fn load_process_conn(
55        conn: &Connection,
56        process_id: &str,
57    ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
58        let json: Option<String> = conn
59            .query_row(
60                "SELECT record_json FROM processes WHERE process_id = ?1",
61                params![process_id],
62                |row| row.get(0),
63            )
64            .optional()
65            .map_err(process_sqlite_error)?;
66        json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
67            .transpose()
68    }
69
70    fn save_process_conn(
71        conn: &Connection,
72        record: &ProcessRecord,
73    ) -> Result<(), lash_core::PluginError> {
74        conn.execute(
75            "UPDATE processes
76             SET updated_at_ms = ?2, status = ?3, record_json = ?4
77             WHERE process_id = ?1",
78            params![
79                record.id.as_str(),
80                record.updated_at_ms as i64,
81                process_status_label(record),
82                process_encode_json(record)?
83            ],
84        )
85        .map_err(process_sqlite_error)?;
86        Ok(())
87    }
88
89    fn load_event_by_key_conn(
90        conn: &Connection,
91        process_id: &str,
92        replay_key: &str,
93    ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
94        let row: Option<(String, String)> = conn
95            .query_row(
96                "SELECT payload_hash, event_json
97                 FROM process_events
98                 WHERE process_id = ?1 AND idempotency_key = ?2",
99                params![process_id, replay_key],
100                |row| Ok((row.get(0)?, row.get(1)?)),
101            )
102            .optional()
103            .map_err(process_sqlite_error)?;
104        row.map(|(hash, json)| {
105            serde_json::from_str(&json)
106                .map(|event| (hash, event))
107                .map_err(process_decode_error)
108        })
109        .transpose()
110    }
111
112    fn load_process_lease_conn(
113        conn: &Connection,
114        process_id: &str,
115    ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
116        conn.query_row(
117            "SELECT lease_owner_id, lease_token, lease_fencing_token,
118                    lease_claimed_at_ms, lease_expires_at_ms
119             FROM process_leases
120             WHERE process_id = ?1",
121            params![process_id],
122            |row| {
123                let owner_id: Option<String> = row.get(0)?;
124                let lease_token: Option<String> = row.get(1)?;
125                let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
126                    return Ok(None);
127                };
128                Ok(Some(ProcessLease {
129                    schema_version: PROCESS_LEASE_SCHEMA_VERSION,
130                    process_id: process_id.to_string(),
131                    owner_id,
132                    lease_token,
133                    fencing_token: row.get::<_, i64>(2)? as u64,
134                    claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
135                    expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
136                }))
137            },
138        )
139        .optional()
140        .map(|lease| lease.flatten())
141        .map_err(process_sqlite_error)
142    }
143
144    fn list_grants_for_scope_conn(
145        conn: &Connection,
146        owner_scope: &ProcessScope,
147        live_only: bool,
148    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
149        let owner_scope_id = owner_scope.id();
150        let status_clause = if live_only {
151            "AND p.status = 'running'"
152        } else {
153            ""
154        };
155        let mut stmt = conn
156            .prepare(&format!(
157                "SELECT g.process_id, g.descriptor_json, p.record_json
158                 FROM process_handle_grants g
159                 JOIN processes p ON p.process_id = g.process_id
160                 WHERE g.scope_id = ?1 {status_clause}
161                 ORDER BY g.process_id ASC"
162            ))
163            .map_err(process_sqlite_error)?;
164        let rows = stmt
165            .query_map(params![owner_scope_id.as_str()], |row| {
166                Ok((
167                    row.get::<_, String>(0)?,
168                    row.get::<_, String>(1)?,
169                    row.get::<_, String>(2)?,
170                ))
171            })
172            .map_err(process_sqlite_error)?;
173        let mut entries = Vec::new();
174        for row in rows {
175            let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
176            let descriptor: ProcessHandleDescriptor =
177                serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
178            let record: ProcessRecord =
179                serde_json::from_str(&record_json).map_err(process_decode_error)?;
180            entries.push((
181                ProcessHandleGrant {
182                    session_id: owner_scope.session_id.clone(),
183                    process_id,
184                    descriptor,
185                },
186                record,
187            ));
188        }
189        Ok(entries)
190    }
191}
192
193/// Map a `Result<T, PluginError>` produced by a synchronous transaction body to
194/// a [`TxOutcome`]: commit on success, roll back on logical error. Both arms
195/// carry the inner `Result` back so the caller recovers the value or the
196/// `PluginError` after the transaction resolves.
197fn tx_outcome<T>(
198    result: Result<T, lash_core::PluginError>,
199) -> TxOutcome<Result<T, lash_core::PluginError>> {
200    match result {
201        Ok(value) => TxOutcome::Commit(Ok(value)),
202        Err(err) => TxOutcome::Rollback(Err(err)),
203    }
204}
205
206#[async_trait::async_trait]
207impl ProcessRegistry for SqliteProcessRegistry {
208    fn durability_tier(&self) -> DurabilityTier {
209        DurabilityTier::Durable
210    }
211
212    async fn register_process(
213        &self,
214        registration: ProcessRegistration,
215    ) -> Result<ProcessRecord, lash_core::PluginError> {
216        let (registration, registration_hash) = prepare_process_registration(registration)?;
217        self.conn
218            .write_flow(move |tx| {
219                Ok(tx_outcome((|| {
220                    if let Some(existing) = Self::load_process_conn(tx, &registration.id)? {
221                        if existing.registration_hash == registration_hash {
222                            return Ok(existing);
223                        }
224                        return Err(lash_core::PluginError::Session(format!(
225                            "process `{}` registration hash conflict: existing {}, new {}",
226                            registration.id, existing.registration_hash, registration_hash
227                        )));
228                    }
229                    let now = current_epoch_ms();
230                    let record = ProcessRecord::from_prepared_registration(
231                        registration,
232                        registration_hash,
233                        now,
234                    );
235                    let owner_scope_id = record.owner_scope_id();
236                    tx.execute(
237                        "INSERT INTO processes (
238                            process_id, registration_hash, owner_scope_id, host_profile_id,
239                            created_at_ms, updated_at_ms, status, record_json
240                         )
241                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
242                        params![
243                            record.id.as_str(),
244                            record.registration_hash.as_str(),
245                            owner_scope_id.as_str(),
246                            record.host_profile_id(),
247                            record.created_at_ms as i64,
248                            record.updated_at_ms as i64,
249                            process_status_label(&record),
250                            process_encode_json(&record)?,
251                        ],
252                    )
253                    .map_err(process_sqlite_error)?;
254                    Ok(record)
255                })()))
256            })
257            .await
258            .map_err(process_sqlite_error)?
259    }
260
261    async fn set_external_ref(
262        &self,
263        process_id: &str,
264        external_ref: ProcessExternalRef,
265    ) -> Result<ProcessRecord, lash_core::PluginError> {
266        let process_id = process_id.to_string();
267        self.conn
268            .write_flow(move |tx| {
269                Ok(tx_outcome((|| {
270                    let mut record =
271                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
272                            lash_core::PluginError::Session(format!(
273                                "unknown process `{process_id}`"
274                            ))
275                        })?;
276                    record.external_ref = Some(external_ref);
277                    record.updated_at_ms = current_epoch_ms();
278                    Self::save_process_conn(tx, &record)?;
279                    Ok(record)
280                })()))
281            })
282            .await
283            .map_err(process_sqlite_error)?
284    }
285
286    async fn grant_handle(
287        &self,
288        owner_scope: &ProcessScope,
289        process_id: &str,
290        descriptor: ProcessHandleDescriptor,
291    ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
292        let owner_scope = owner_scope.clone();
293        let process_id = process_id.to_string();
294        self.conn
295            .write_flow(move |tx| {
296                Ok(tx_outcome((|| {
297                    let owner_scope_id = owner_scope.id();
298                    if Self::load_process_conn(tx, &process_id)?.is_none() {
299                        return Err(lash_core::PluginError::Session(format!(
300                            "unknown process `{process_id}`"
301                        )));
302                    }
303                    tx.execute(
304                        "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
305                         VALUES (?1, ?2, ?3, ?4)
306                         ON CONFLICT(scope_id, process_id) DO UPDATE SET
307                            session_id = excluded.session_id,
308                            descriptor_json = excluded.descriptor_json",
309                        params![
310                            owner_scope.session_id.as_str(),
311                            owner_scope_id.as_str(),
312                            process_id.as_str(),
313                            process_encode_json(&descriptor)?
314                        ],
315                    )
316                    .map_err(process_sqlite_error)?;
317                    Ok(ProcessHandleGrant {
318                        session_id: owner_scope.session_id.clone(),
319                        process_id: process_id.clone(),
320                        descriptor,
321                    })
322                })()))
323            })
324            .await
325            .map_err(process_sqlite_error)?
326    }
327
328    async fn revoke_handle(
329        &self,
330        owner_scope: &ProcessScope,
331        process_id: &str,
332    ) -> Result<(), lash_core::PluginError> {
333        let owner_scope_id = owner_scope.id().as_str().to_string();
334        let process_id = process_id.to_string();
335        self.conn
336            .call(move |conn| {
337                conn.execute(
338                    "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
339                    params![owner_scope_id, process_id],
340                )
341            })
342            .await
343            .map_err(process_sqlite_error)?;
344        Ok(())
345    }
346
347    async fn transfer_handle_grants(
348        &self,
349        from_scope: &ProcessScope,
350        to_scope: &ProcessScope,
351        process_ids: &[String],
352    ) -> Result<(), lash_core::PluginError> {
353        let from_scope = from_scope.clone();
354        let to_scope = to_scope.clone();
355        let process_ids = process_ids.to_vec();
356        self.conn
357            .write_flow(move |tx| {
358                Ok(tx_outcome((|| {
359                    let from_scope_id = from_scope.id();
360                    let to_scope_id = to_scope.id();
361                    for process_id in &process_ids {
362                        let descriptor_json: Option<String> = tx
363                            .query_row(
364                                "SELECT descriptor_json
365                                 FROM process_handle_grants
366                                 WHERE scope_id = ?1 AND process_id = ?2",
367                                params![from_scope_id.as_str(), process_id.as_str()],
368                                |row| row.get(0),
369                            )
370                            .optional()
371                            .map_err(process_sqlite_error)?;
372                        let Some(descriptor_json) = descriptor_json else {
373                            return Err(lash_core::PluginError::Session(format!(
374                                "process handle `{process_id}` is not granted to session `{}`",
375                                from_scope.session_id
376                            )));
377                        };
378                        tx.execute(
379                            "DELETE FROM process_handle_grants
380                             WHERE scope_id = ?1 AND process_id = ?2",
381                            params![from_scope_id.as_str(), process_id.as_str()],
382                        )
383                        .map_err(process_sqlite_error)?;
384                        tx.execute(
385                            "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
386                             VALUES (?1, ?2, ?3, ?4)
387                             ON CONFLICT(scope_id, process_id) DO UPDATE SET
388                                session_id = excluded.session_id,
389                                descriptor_json = excluded.descriptor_json",
390                            params![
391                                to_scope.session_id.as_str(),
392                                to_scope_id.as_str(),
393                                process_id.as_str(),
394                                descriptor_json
395                            ],
396                        )
397                        .map_err(process_sqlite_error)?;
398                    }
399                    Ok(())
400                })()))
401            })
402            .await
403            .map_err(process_sqlite_error)?
404    }
405
406    async fn list_handle_grants(
407        &self,
408        owner_scope: &ProcessScope,
409    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
410        let owner_scope = owner_scope.clone();
411        self.conn
412            .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &owner_scope, false)))
413            .await
414            .map_err(process_sqlite_error)?
415    }
416
417    async fn list_live_handle_grants(
418        &self,
419        owner_scope: &ProcessScope,
420    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
421        let owner_scope = owner_scope.clone();
422        self.conn
423            .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &owner_scope, true)))
424            .await
425            .map_err(process_sqlite_error)?
426    }
427
428    async fn has_handle_grant(
429        &self,
430        owner_scope: &ProcessScope,
431        process_id: &str,
432    ) -> Result<bool, lash_core::PluginError> {
433        let owner_scope_id = owner_scope.id().as_str().to_string();
434        let process_id = process_id.to_string();
435        self.conn
436            .call(move |conn| {
437                let exists = conn
438                    .query_row(
439                        "SELECT 1
440                         FROM process_handle_grants g
441                         JOIN processes p ON p.process_id = g.process_id
442                         WHERE g.scope_id = ?1 AND g.process_id = ?2
443                         LIMIT 1",
444                        params![owner_scope_id, process_id],
445                        |_| Ok(()),
446                    )
447                    .optional()?
448                    .is_some();
449                Ok(exists)
450            })
451            .await
452            .map_err(process_sqlite_error)
453    }
454
455    async fn handle_grants_for_process(
456        &self,
457        process_id: &str,
458    ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
459        let process_id = process_id.to_string();
460        self.conn
461            .call(move |conn| {
462                Ok((|| {
463                    if Self::load_process_conn(conn, &process_id)?.is_none() {
464                        return Err(lash_core::PluginError::Session(format!(
465                            "unknown process `{process_id}`"
466                        )));
467                    }
468                    let mut stmt = conn
469                        .prepare(
470                            "SELECT session_id, descriptor_json
471                             FROM process_handle_grants
472                             WHERE process_id = ?1
473                             ORDER BY session_id ASC, scope_id ASC",
474                        )
475                        .map_err(process_sqlite_error)?;
476                    let rows = stmt
477                        .query_map(params![process_id], |row| {
478                            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
479                        })
480                        .map_err(process_sqlite_error)?;
481                    let mut grants = Vec::new();
482                    for row in rows {
483                        let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
484                        let descriptor: ProcessHandleDescriptor =
485                            serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
486                        grants.push(ProcessHandleGrant {
487                            session_id,
488                            process_id: process_id.clone(),
489                            descriptor,
490                        });
491                    }
492                    Ok(grants)
493                })())
494            })
495            .await
496            .map_err(process_sqlite_error)?
497    }
498
499    async fn delete_session_process_state(
500        &self,
501        session_id: &str,
502    ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
503        let session_id_owned = session_id.to_string();
504        let (revoked_handle_count, mut cancel_process_ids, mut preserved_process_ids) = self
505            .conn
506            .write_flow(move |tx| {
507                Ok(tx_outcome((|| {
508                    let session_id = session_id_owned;
509                    let removed = {
510                        let mut stmt = tx
511                            .prepare(
512                                "SELECT g.process_id, p.record_json
513                                 FROM process_handle_grants g
514                                 JOIN processes p ON p.process_id = g.process_id
515                                 WHERE g.session_id = ?1
516                                 ORDER BY g.process_id ASC",
517                            )
518                            .map_err(process_sqlite_error)?;
519                        let rows = stmt
520                            .query_map(params![session_id], |row| {
521                                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
522                            })
523                            .map_err(process_sqlite_error)?;
524                        let mut removed = Vec::new();
525                        for row in rows {
526                            let (process_id, record_json) = row.map_err(process_sqlite_error)?;
527                            let record: ProcessRecord =
528                                serde_json::from_str(&record_json).map_err(process_decode_error)?;
529                            removed.push((process_id, record));
530                        }
531                        removed
532                    };
533
534                    let revoked_handle_count = tx
535                        .execute(
536                            "DELETE FROM process_handle_grants WHERE session_id = ?1",
537                            params![session_id],
538                        )
539                        .map_err(process_sqlite_error)?;
540                    let mut cancel_process_ids = Vec::new();
541                    let mut preserved_process_ids = Vec::new();
542                    for (process_id, record) in removed {
543                        if record.is_terminal() {
544                            continue;
545                        }
546                        let remaining_grants: i64 = tx
547                            .query_row(
548                                "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
549                                params![process_id],
550                                |row| row.get(0),
551                            )
552                            .map_err(process_sqlite_error)?;
553                        if remaining_grants == 0 {
554                            cancel_process_ids.push(process_id);
555                        } else {
556                            preserved_process_ids.push(process_id);
557                        }
558                    }
559                    Ok((
560                        revoked_handle_count,
561                        cancel_process_ids,
562                        preserved_process_ids,
563                    ))
564                })()))
565            })
566            .await
567            .map_err(process_sqlite_error)??;
568        cancel_process_ids.sort();
569        cancel_process_ids.dedup();
570        preserved_process_ids.sort();
571        preserved_process_ids.dedup();
572        Ok(lash_core::ProcessSessionDeleteReport {
573            session_id: session_id.to_string(),
574            revoked_handle_count,
575            deleted_wake_count: 0,
576            cancel_process_ids,
577            preserved_process_ids,
578        })
579    }
580
581    async fn append_event(
582        &self,
583        process_id: &str,
584        request: ProcessEventAppendRequest,
585    ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
586        let process_id = process_id.to_string();
587        let (result, appended) = self
588            .conn
589            .write_flow(move |tx| {
590                Ok(tx_outcome((|| {
591                    let mut record =
592                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
593                            lash_core::PluginError::Session(format!(
594                                "unknown process `{process_id}`"
595                            ))
596                        })?;
597                    let replay_lookup = if let Some(replay_key) =
598                        request.replay.as_ref().map(|replay| replay.key.as_str())
599                    {
600                        Self::load_event_by_key_conn(tx, &process_id, replay_key)?
601                    } else {
602                        None
603                    };
604                    let sequence = tx
605                        .query_row(
606                            "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
607                            params![process_id],
608                            |row| row.get::<_, i64>(0),
609                        )
610                        .map_err(process_sqlite_error)? as u64;
611                    let occurred_at_ms = current_epoch_ms();
612                    let prepared = prepare_process_event_append(
613                        &record,
614                        request,
615                        sequence,
616                        replay_lookup,
617                        occurred_at_ms,
618                    )?;
619                    if prepared.replayed {
620                        return Ok((
621                            ProcessEventAppendResult {
622                                event: prepared.event,
623                                wake_delivery: prepared.wake_delivery,
624                            },
625                            false,
626                        ));
627                    }
628                    let event = prepared.event;
629                    tx.execute(
630                        "INSERT INTO process_events (
631                            process_id, sequence, event_type, payload_hash, idempotency_key,
632                            occurred_at_ms, event_json
633                         )
634                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
635                        params![
636                            process_id,
637                            sequence as i64,
638                            event.event_type.as_str(),
639                            prepared.payload_hash.as_str(),
640                            event.invocation.replay_key(),
641                            prepared.occurred_at_ms as i64,
642                            process_encode_json(&event)?,
643                        ],
644                    )
645                    .map_err(process_sqlite_error)?;
646                    if let Some(status) = prepared.status_update.clone() {
647                        record.status = status;
648                    }
649                    record.updated_at_ms = prepared.occurred_at_ms;
650                    Self::save_process_conn(tx, &record)?;
651                    Ok((
652                        ProcessEventAppendResult {
653                            event,
654                            wake_delivery: prepared.wake_delivery,
655                        },
656                        true,
657                    ))
658                })()))
659            })
660            .await
661            .map_err(process_sqlite_error)??;
662        if appended {
663            self.notify.notify_waiters();
664        }
665        Ok(result)
666    }
667
668    async fn events_after(
669        &self,
670        process_id: &str,
671        after_sequence: u64,
672    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
673        let process_id = process_id.to_string();
674        self.conn
675            .call(move |conn| {
676                Ok((|| {
677                    if Self::load_process_conn(conn, &process_id)?.is_none() {
678                        return Err(lash_core::PluginError::Session(format!(
679                            "unknown process `{process_id}`"
680                        )));
681                    }
682                    let mut stmt = conn
683                        .prepare(
684                            "SELECT event_json FROM process_events
685                             WHERE process_id = ?1 AND sequence > ?2
686                             ORDER BY sequence ASC",
687                        )
688                        .map_err(process_sqlite_error)?;
689                    let rows = stmt
690                        .query_map(params![process_id, after_sequence as i64], |row| {
691                            row.get::<_, String>(0)
692                        })
693                        .map_err(process_sqlite_error)?;
694                    let mut events = Vec::new();
695                    for row in rows {
696                        events.push(
697                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
698                                .map_err(process_decode_error)?,
699                        );
700                    }
701                    Ok(events)
702                })())
703            })
704            .await
705            .map_err(process_sqlite_error)?
706    }
707
708    async fn wake_events_after(
709        &self,
710        process_id: &str,
711        after_sequence: u64,
712    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
713        let acked: std::collections::HashSet<u64> = {
714            let process_id = process_id.to_string();
715            self.conn
716                .call(move |conn| {
717                    Ok(
718                        (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
719                            let mut stmt = conn
720                                .prepare(
721                                    "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
722                                )
723                                .map_err(process_sqlite_error)?;
724                            let rows = stmt
725                                .query_map(params![process_id], |row| row.get::<_, i64>(0))
726                                .map_err(process_sqlite_error)?;
727                            let mut set = std::collections::HashSet::new();
728                            for row in rows {
729                                set.insert(row.map_err(process_sqlite_error)? as u64);
730                            }
731                            Ok(set)
732                        })(),
733                    )
734                })
735                .await
736                .map_err(process_sqlite_error)??
737        };
738        Ok(self
739            .events_after(process_id, after_sequence)
740            .await?
741            .into_iter()
742            .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
743            .collect())
744    }
745
746    async fn wait_event_after(
747        &self,
748        process_id: &str,
749        event_type: &str,
750        after_sequence: u64,
751    ) -> Result<ProcessEvent, lash_core::PluginError> {
752        loop {
753            if let Some(event) = self
754                .events_after(process_id, after_sequence)
755                .await?
756                .into_iter()
757                .find(|event| event.event_type == event_type)
758            {
759                return Ok(event);
760            }
761            tokio::select! {
762                _ = self.notify.notified() => {}
763                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
764            }
765        }
766    }
767
768    async fn await_process(
769        &self,
770        process_id: &str,
771    ) -> Result<ProcessAwaitOutput, lash_core::PluginError> {
772        loop {
773            let record = self.get_process(process_id).await.ok_or_else(|| {
774                lash_core::PluginError::Session(format!("unknown process `{process_id}`"))
775            })?;
776            if let Some(await_output) = record.status.await_output() {
777                return Ok(await_output.clone());
778            }
779            tokio::select! {
780                _ = self.notify.notified() => {}
781                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
782            }
783        }
784    }
785
786    async fn complete_process(
787        &self,
788        process_id: &str,
789        await_output: ProcessAwaitOutput,
790    ) -> Result<ProcessRecord, lash_core::PluginError> {
791        let event_type = match await_output.terminal_state() {
792            lash_core::ProcessTerminalState::Completed => "process.completed",
793            lash_core::ProcessTerminalState::Failed => "process.failed",
794            lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
795        };
796        self.append_event(
797            process_id,
798            ProcessEventAppendRequest::new(
799                event_type,
800                serde_json::json!({ "await_output": await_output }),
801            )
802            .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
803        )
804        .await?;
805        self.get_process(process_id).await.ok_or_else(|| {
806            lash_core::PluginError::Session(format!(
807                "unknown process `{process_id}` after terminal event"
808            ))
809        })
810    }
811
812    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
813        let process_id = process_id.to_string();
814        self.conn
815            .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
816            .await
817            .ok()
818            .flatten()
819    }
820
821    async fn ack_wake(
822        &self,
823        process_id: &str,
824        sequence: u64,
825    ) -> Result<(), lash_core::PluginError> {
826        let process_id = process_id.to_string();
827        self.conn
828            .call(move |conn| {
829                Ok((|| {
830                    if Self::load_process_conn(conn, &process_id)?.is_none() {
831                        return Err(lash_core::PluginError::Session(format!(
832                            "unknown process `{process_id}`"
833                        )));
834                    }
835                    conn.execute(
836                        "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
837                        params![process_id, sequence as i64],
838                    )
839                    .map_err(process_sqlite_error)?;
840                    Ok(())
841                })())
842            })
843            .await
844            .map_err(process_sqlite_error)?
845    }
846
847    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
848        self.conn
849            .call(move |conn| {
850                Ok((|| {
851                    let mut stmt = conn
852                        .prepare(
853                            "SELECT record_json FROM processes
854                             WHERE status = 'running'
855                             ORDER BY process_id ASC",
856                        )
857                        .map_err(process_sqlite_error)?;
858                    let rows = stmt
859                        .query_map([], |row| row.get::<_, String>(0))
860                        .map_err(process_sqlite_error)?;
861                    let mut records = Vec::new();
862                    for row in rows {
863                        let record: ProcessRecord =
864                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
865                                .map_err(process_decode_error)?;
866                        records.push(record);
867                    }
868                    Ok(records)
869                })())
870            })
871            .await
872            .map_err(process_sqlite_error)?
873    }
874
875    async fn claim_process_lease(
876        &self,
877        process_id: &str,
878        owner_id: &str,
879        lease_ttl_ms: u64,
880    ) -> Result<ProcessLease, lash_core::PluginError> {
881        let process_id = process_id.to_string();
882        let owner_id = owner_id.to_string();
883        self.conn
884            .write_flow(move |tx| {
885                Ok(tx_outcome((|| {
886                    if Self::load_process_conn(tx, &process_id)?.is_none() {
887                        return Err(lash_core::PluginError::Session(format!(
888                            "unknown process `{process_id}`"
889                        )));
890                    }
891                    let now = current_epoch_ms();
892                    let current = Self::load_process_lease_conn(tx, &process_id)?;
893                    if let Some(current) = current.as_ref()
894                        && current.expires_at_epoch_ms > now
895                        && current.owner_id != owner_id
896                    {
897                        return Err(process_lease_conflict(&process_id, current));
898                    }
899                    // Read the raw fencing token directly: a completed/abandoned
900                    // lease nulls the owner/token columns but retains the
901                    // monotonically-increasing `lease_fencing_token`, so a
902                    // re-claim never reuses a stale writer's token.
903                    let fencing_token: u64 = tx
904                        .query_row(
905                            "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
906                            params![process_id],
907                            |row| row.get::<_, i64>(0),
908                        )
909                        .optional()
910                        .map_err(process_sqlite_error)?
911                        .unwrap_or(0) as u64
912                        + 1;
913                    let lease = ProcessLease {
914                        schema_version: PROCESS_LEASE_SCHEMA_VERSION,
915                        process_id: process_id.clone(),
916                        owner_id: owner_id.clone(),
917                        lease_token: format!(
918                            "{:x}",
919                            Sha256::digest(
920                                format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes()
921                            )
922                        ),
923                        fencing_token,
924                        claimed_at_epoch_ms: now,
925                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
926                    };
927                    tx.execute(
928                        "INSERT INTO process_leases (
929                            process_id, lease_owner_id, lease_token, lease_fencing_token,
930                            lease_claimed_at_ms, lease_expires_at_ms
931                         )
932                         VALUES (?1, ?2, ?3, ?4, ?5, ?6)
933                         ON CONFLICT(process_id) DO UPDATE SET
934                            lease_owner_id = excluded.lease_owner_id,
935                            lease_token = excluded.lease_token,
936                            lease_fencing_token = excluded.lease_fencing_token,
937                            lease_claimed_at_ms = excluded.lease_claimed_at_ms,
938                            lease_expires_at_ms = excluded.lease_expires_at_ms",
939                        params![
940                            lease.process_id.as_str(),
941                            lease.owner_id.as_str(),
942                            lease.lease_token.as_str(),
943                            lease.fencing_token as i64,
944                            lease.claimed_at_epoch_ms as i64,
945                            lease.expires_at_epoch_ms as i64,
946                        ],
947                    )
948                    .map_err(process_sqlite_error)?;
949                    Ok(lease)
950                })()))
951            })
952            .await
953            .map_err(process_sqlite_error)?
954    }
955
956    async fn renew_process_lease(
957        &self,
958        lease: &ProcessLease,
959        lease_ttl_ms: u64,
960    ) -> Result<ProcessLease, lash_core::PluginError> {
961        let lease = lease.clone();
962        self.conn
963            .write_flow(move |tx| {
964                Ok(tx_outcome((|| {
965                    let now = current_epoch_ms();
966                    let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
967                    if !guard_lease(current.as_ref(), &lease.lease_token, now) {
968                        return Err(process_lease_expired(&lease.process_id));
969                    }
970                    let renewed = ProcessLease {
971                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
972                        ..lease.clone()
973                    };
974                    tx.execute(
975                        "UPDATE process_leases
976                         SET lease_expires_at_ms = ?2
977                         WHERE process_id = ?1 AND lease_token = ?3",
978                        params![
979                            renewed.process_id.as_str(),
980                            renewed.expires_at_epoch_ms as i64,
981                            renewed.lease_token.as_str(),
982                        ],
983                    )
984                    .map_err(process_sqlite_error)?;
985                    Ok(renewed)
986                })()))
987            })
988            .await
989            .map_err(process_sqlite_error)?
990    }
991
992    async fn complete_process_lease(
993        &self,
994        completion: &ProcessLeaseCompletion,
995    ) -> Result<(), lash_core::PluginError> {
996        let process_id = completion.process_id.clone();
997        let lease_token = completion.lease_token.clone();
998        self.conn
999            .call(move |conn| {
1000                conn.execute(
1001                    "UPDATE process_leases
1002                     SET lease_owner_id = NULL,
1003                         lease_token = NULL,
1004                         lease_claimed_at_ms = 0,
1005                         lease_expires_at_ms = 0
1006                     WHERE process_id = ?1 AND lease_token = ?2",
1007                    params![process_id, lease_token],
1008                )
1009            })
1010            .await
1011            .map_err(process_sqlite_error)?;
1012        Ok(())
1013    }
1014}
1015
1016/// Loud, stable error for a fenced process-lease claim on the `PluginError`
1017/// channel the [`ProcessRegistry`] trait returns.
1018fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> lash_core::PluginError {
1019    lash_core::PluginError::Session(format!(
1020        "process `{process_id}` is already leased by `{}` until {}",
1021        current.owner_id, current.expires_at_epoch_ms
1022    ))
1023}
1024
1025/// Loud, stable error for a superseded or expired process lease.
1026fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1027    lash_core::PluginError::Session(format!(
1028        "process lease for `{process_id}` is missing or expired"
1029    ))
1030}