Skip to main content

lash_sqlite_store/
process_registry.rs

1//! SQLite-backed [`ProcessRegistry`] (`SqliteProcessRegistry`).
2//!
3//! Ported from the prior store, preserving the public async surface byte-for-byte
4//! (the type name keeps the `Sqlite` prefix so the path-rename swap keeps
5//! compiling). The prior implementation ran every op directly on `&rusqlite::Connection`
6//! with `.await`; here every DB body is a *synchronous* rusqlite closure handed
7//! to [`SqliteConnection::call`] (reads) or [`SqliteConnection::write_flow`]
8//! (read-then-write).
9//!
10//! ## Why `write_flow`, not `write`
11//!
12//! The registry's transactional methods produce a [`lash_core::PluginError`],
13//! not a `rusqlite::Error`. `SqliteConnection::write` rolls back only when the
14//! closure returns `Err(rusqlite::Error)`, so a logical `PluginError` (e.g. a
15//! registration-hash conflict) would otherwise *commit* the partial work. Each
16//! such method therefore runs its synchronous body returning
17//! `Result<T, PluginError>` and maps it to a [`TxOutcome`]: `Ok` ⇒
18//! `Commit(Ok(value))`, `Err` ⇒ `Rollback(Err(error))`. That preserves the
19//! prior behaviour of rolling back on every error while still carrying the
20//! `PluginError` back to the caller. The outer `rusqlite::Error` channel only
21//! carries genuine SQLite/connection failures, mapped via `process_sqlite_error`.
22//!
23//! The `*_conn` helpers are synchronous and take a `&rusqlite::Connection` so
24//! they compose inside either closure — including from within a `&Transaction`,
25//! which derefs to `&Connection`.
26
27use super::*;
28
29fn process_status_label(record: &ProcessRecord) -> &'static str {
30    record.status.label()
31}
32
33impl SqliteProcessRegistry {
34    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
35        let conn = SqliteConnection::open(path).await?;
36        ensure_process_schema(&conn).await?;
37        apply_pragmas(&conn, StoreBacking::File).await?;
38        Ok(Self {
39            conn,
40            notify: tokio::sync::Notify::new(),
41        })
42    }
43
44    pub async fn memory() -> tokio_rusqlite::Result<Self> {
45        let conn = SqliteConnection::open_in_memory().await?;
46        ensure_process_schema(&conn).await?;
47        apply_pragmas(&conn, StoreBacking::Memory).await?;
48        Ok(Self {
49            conn,
50            notify: tokio::sync::Notify::new(),
51        })
52    }
53
54    fn load_process_conn(
55        conn: &Connection,
56        process_id: &str,
57    ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
58        let json: Option<String> = conn
59            .query_row(
60                "SELECT record_json FROM processes WHERE process_id = ?1",
61                params![process_id],
62                |row| row.get(0),
63            )
64            .optional()
65            .map_err(process_sqlite_error)?;
66        json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
67            .transpose()
68    }
69
70    fn save_process_conn(
71        conn: &Connection,
72        record: &ProcessRecord,
73    ) -> Result<(), lash_core::PluginError> {
74        conn.execute(
75            "UPDATE processes
76             SET updated_at_ms = ?2, status = ?3, record_json = ?4
77             WHERE process_id = ?1",
78            params![
79                record.id.as_str(),
80                record.updated_at_ms as i64,
81                process_status_label(record),
82                process_encode_json(record)?
83            ],
84        )
85        .map_err(process_sqlite_error)?;
86        Ok(())
87    }
88
89    fn load_event_by_key_conn(
90        conn: &Connection,
91        process_id: &str,
92        replay_key: &str,
93    ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
94        let row: Option<(String, String)> = conn
95            .query_row(
96                "SELECT payload_hash, event_json
97                 FROM process_events
98                 WHERE process_id = ?1 AND idempotency_key = ?2",
99                params![process_id, replay_key],
100                |row| Ok((row.get(0)?, row.get(1)?)),
101            )
102            .optional()
103            .map_err(process_sqlite_error)?;
104        row.map(|(hash, json)| {
105            serde_json::from_str(&json)
106                .map(|event| (hash, event))
107                .map_err(process_decode_error)
108        })
109        .transpose()
110    }
111
112    fn load_process_lease_conn(
113        conn: &Connection,
114        process_id: &str,
115    ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
116        conn.query_row(
117            "SELECT lease_owner_id, lease_token, lease_fencing_token,
118                    lease_claimed_at_ms, lease_expires_at_ms
119             FROM process_leases
120             WHERE process_id = ?1",
121            params![process_id],
122            |row| {
123                let owner_id: Option<String> = row.get(0)?;
124                let lease_token: Option<String> = row.get(1)?;
125                let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
126                    return Ok(None);
127                };
128                Ok(Some(ProcessLease {
129                    schema_version: PROCESS_LEASE_SCHEMA_VERSION,
130                    process_id: process_id.to_string(),
131                    owner_id,
132                    lease_token,
133                    fencing_token: row.get::<_, i64>(2)? as u64,
134                    claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
135                    expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
136                }))
137            },
138        )
139        .optional()
140        .map(|lease| lease.flatten())
141        .map_err(process_sqlite_error)
142    }
143
144    fn list_grants_for_scope_conn(
145        conn: &Connection,
146        session_scope: &SessionScope,
147        live_only: bool,
148    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
149        let session_scope_id = session_scope.id();
150        let status_clause = if live_only {
151            "AND p.status = 'running'"
152        } else {
153            ""
154        };
155        let mut stmt = conn
156            .prepare(&format!(
157                "SELECT g.process_id, g.descriptor_json, p.record_json
158                 FROM process_handle_grants g
159                 JOIN processes p ON p.process_id = g.process_id
160                 WHERE g.scope_id = ?1 {status_clause}
161                 ORDER BY g.process_id ASC"
162            ))
163            .map_err(process_sqlite_error)?;
164        let rows = stmt
165            .query_map(params![session_scope_id.as_str()], |row| {
166                Ok((
167                    row.get::<_, String>(0)?,
168                    row.get::<_, String>(1)?,
169                    row.get::<_, String>(2)?,
170                ))
171            })
172            .map_err(process_sqlite_error)?;
173        let mut entries = Vec::new();
174        for row in rows {
175            let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
176            let descriptor: ProcessHandleDescriptor =
177                serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
178            let record: ProcessRecord =
179                serde_json::from_str(&record_json).map_err(process_decode_error)?;
180            entries.push((
181                ProcessHandleGrant {
182                    session_id: session_scope.session_id.clone(),
183                    process_id,
184                    descriptor,
185                },
186                record,
187            ));
188        }
189        Ok(entries)
190    }
191}
192
193/// Map a `Result<T, PluginError>` produced by a synchronous transaction body to
194/// a [`TxOutcome`]: commit on success, roll back on logical error. Both arms
195/// carry the inner `Result` back so the caller recovers the value or the
196/// `PluginError` after the transaction resolves.
197fn tx_outcome<T>(
198    result: Result<T, lash_core::PluginError>,
199) -> TxOutcome<Result<T, lash_core::PluginError>> {
200    match result {
201        Ok(value) => TxOutcome::Commit(Ok(value)),
202        Err(err) => TxOutcome::Rollback(Err(err)),
203    }
204}
205
206#[async_trait::async_trait]
207impl ProcessRegistry for SqliteProcessRegistry {
208    fn durability_tier(&self) -> DurabilityTier {
209        DurabilityTier::Durable
210    }
211
212    async fn register_process(
213        &self,
214        registration: ProcessRegistration,
215    ) -> Result<ProcessRecord, lash_core::PluginError> {
216        let (registration, registration_hash) = prepare_process_registration(registration)?;
217        let record = self
218            .conn
219            .write_flow(move |tx| {
220                Ok(tx_outcome((|| {
221                    if let Some(existing) = Self::load_process_conn(tx, &registration.id)? {
222                        if existing.registration_hash == registration_hash {
223                            return Ok(existing);
224                        }
225                        return Err(lash_core::PluginError::Session(format!(
226                            "process `{}` registration hash conflict: existing {}, new {}",
227                            registration.id, existing.registration_hash, registration_hash
228                        )));
229                    }
230                    let now = current_epoch_ms();
231                    let record = ProcessRecord::from_prepared_registration(
232                        registration,
233                        registration_hash,
234                        now,
235                    );
236                    let originator_scope_id = record.originator_scope_id();
237                    tx.execute(
238                        "INSERT INTO processes (
239                            process_id, registration_hash, owner_scope_id, host_profile_id,
240                            created_at_ms, updated_at_ms, status, record_json
241                         )
242                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
243                        params![
244                            record.id.as_str(),
245                            record.registration_hash.as_str(),
246                            originator_scope_id.as_str(),
247                            record.host_profile_id(),
248                            record.created_at_ms as i64,
249                            record.updated_at_ms as i64,
250                            process_status_label(&record),
251                            process_encode_json(&record)?,
252                        ],
253                    )
254                    .map_err(process_sqlite_error)?;
255                    Ok(record)
256                })()))
257            })
258            .await
259            .map_err(process_sqlite_error)??;
260        self.notify.notify_waiters();
261        Ok(record)
262    }
263
264    async fn set_external_ref(
265        &self,
266        process_id: &str,
267        external_ref: ProcessExternalRef,
268    ) -> Result<ProcessRecord, lash_core::PluginError> {
269        let process_id = process_id.to_string();
270        let record = self
271            .conn
272            .write_flow(move |tx| {
273                Ok(tx_outcome((|| {
274                    let mut record =
275                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
276                            lash_core::PluginError::Session(format!(
277                                "unknown process `{process_id}`"
278                            ))
279                        })?;
280                    record.external_ref = Some(external_ref);
281                    record.updated_at_ms = current_epoch_ms();
282                    Self::save_process_conn(tx, &record)?;
283                    Ok(record)
284                })()))
285            })
286            .await
287            .map_err(process_sqlite_error)??;
288        self.notify.notify_waiters();
289        Ok(record)
290    }
291
292    async fn grant_handle(
293        &self,
294        session_scope: &SessionScope,
295        process_id: &str,
296        descriptor: ProcessHandleDescriptor,
297    ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
298        let session_scope = session_scope.clone();
299        let process_id = process_id.to_string();
300        self.conn
301            .write_flow(move |tx| {
302                Ok(tx_outcome((|| {
303                    let session_scope_id = session_scope.id();
304                    if Self::load_process_conn(tx, &process_id)?.is_none() {
305                        return Err(lash_core::PluginError::Session(format!(
306                            "unknown process `{process_id}`"
307                        )));
308                    }
309                    tx.execute(
310                        "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
311                         VALUES (?1, ?2, ?3, ?4)
312                         ON CONFLICT(scope_id, process_id) DO UPDATE SET
313                            session_id = excluded.session_id,
314                            descriptor_json = excluded.descriptor_json",
315                        params![
316                            session_scope.session_id.as_str(),
317                            session_scope_id.as_str(),
318                            process_id.as_str(),
319                            process_encode_json(&descriptor)?
320                        ],
321                    )
322                    .map_err(process_sqlite_error)?;
323                    Ok(ProcessHandleGrant {
324                        session_id: session_scope.session_id.clone(),
325                        process_id: process_id.clone(),
326                        descriptor,
327                    })
328                })()))
329            })
330            .await
331            .map_err(process_sqlite_error)?
332    }
333
334    async fn revoke_handle(
335        &self,
336        session_scope: &SessionScope,
337        process_id: &str,
338    ) -> Result<(), lash_core::PluginError> {
339        let session_scope_id = session_scope.id().as_str().to_string();
340        let process_id = process_id.to_string();
341        self.conn
342            .call(move |conn| {
343                conn.execute(
344                    "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
345                    params![session_scope_id, process_id],
346                )
347            })
348            .await
349            .map_err(process_sqlite_error)?;
350        Ok(())
351    }
352
353    async fn transfer_handle_grants(
354        &self,
355        from_scope: &SessionScope,
356        to_scope: &SessionScope,
357        process_ids: &[String],
358    ) -> Result<(), lash_core::PluginError> {
359        let from_scope = from_scope.clone();
360        let to_scope = to_scope.clone();
361        let process_ids = process_ids.to_vec();
362        self.conn
363            .write_flow(move |tx| {
364                Ok(tx_outcome((|| {
365                    let from_scope_id = from_scope.id();
366                    let to_scope_id = to_scope.id();
367                    for process_id in &process_ids {
368                        let descriptor_json: Option<String> = tx
369                            .query_row(
370                                "SELECT descriptor_json
371                                 FROM process_handle_grants
372                                 WHERE scope_id = ?1 AND process_id = ?2",
373                                params![from_scope_id.as_str(), process_id.as_str()],
374                                |row| row.get(0),
375                            )
376                            .optional()
377                            .map_err(process_sqlite_error)?;
378                        let Some(descriptor_json) = descriptor_json else {
379                            return Err(lash_core::PluginError::Session(format!(
380                                "process handle `{process_id}` is not granted to session `{}`",
381                                from_scope.session_id
382                            )));
383                        };
384                        tx.execute(
385                            "DELETE FROM process_handle_grants
386                             WHERE scope_id = ?1 AND process_id = ?2",
387                            params![from_scope_id.as_str(), process_id.as_str()],
388                        )
389                        .map_err(process_sqlite_error)?;
390                        tx.execute(
391                            "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
392                             VALUES (?1, ?2, ?3, ?4)
393                             ON CONFLICT(scope_id, process_id) DO UPDATE SET
394                                session_id = excluded.session_id,
395                                descriptor_json = excluded.descriptor_json",
396                            params![
397                                to_scope.session_id.as_str(),
398                                to_scope_id.as_str(),
399                                process_id.as_str(),
400                                descriptor_json
401                            ],
402                        )
403                        .map_err(process_sqlite_error)?;
404                    }
405                    Ok(())
406                })()))
407            })
408            .await
409            .map_err(process_sqlite_error)?
410    }
411
412    async fn list_handle_grants(
413        &self,
414        session_scope: &SessionScope,
415    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
416        let session_scope = session_scope.clone();
417        self.conn
418            .call(move |conn| {
419                Ok(Self::list_grants_for_scope_conn(
420                    conn,
421                    &session_scope,
422                    false,
423                ))
424            })
425            .await
426            .map_err(process_sqlite_error)?
427    }
428
429    async fn list_live_handle_grants(
430        &self,
431        session_scope: &SessionScope,
432    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
433        let session_scope = session_scope.clone();
434        self.conn
435            .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &session_scope, true)))
436            .await
437            .map_err(process_sqlite_error)?
438    }
439
440    async fn has_handle_grant(
441        &self,
442        session_scope: &SessionScope,
443        process_id: &str,
444    ) -> Result<bool, lash_core::PluginError> {
445        let session_scope_id = session_scope.id().as_str().to_string();
446        let process_id = process_id.to_string();
447        self.conn
448            .call(move |conn| {
449                let exists = conn
450                    .query_row(
451                        "SELECT 1
452                         FROM process_handle_grants g
453                         JOIN processes p ON p.process_id = g.process_id
454                         WHERE g.scope_id = ?1 AND g.process_id = ?2
455                         LIMIT 1",
456                        params![session_scope_id, process_id],
457                        |_| Ok(()),
458                    )
459                    .optional()?
460                    .is_some();
461                Ok(exists)
462            })
463            .await
464            .map_err(process_sqlite_error)
465    }
466
467    async fn handle_grants_for_process(
468        &self,
469        process_id: &str,
470    ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
471        let process_id = process_id.to_string();
472        self.conn
473            .call(move |conn| {
474                Ok((|| {
475                    if Self::load_process_conn(conn, &process_id)?.is_none() {
476                        return Err(lash_core::PluginError::Session(format!(
477                            "unknown process `{process_id}`"
478                        )));
479                    }
480                    let mut stmt = conn
481                        .prepare(
482                            "SELECT session_id, descriptor_json
483                             FROM process_handle_grants
484                             WHERE process_id = ?1
485                             ORDER BY session_id ASC, scope_id ASC",
486                        )
487                        .map_err(process_sqlite_error)?;
488                    let rows = stmt
489                        .query_map(params![process_id], |row| {
490                            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
491                        })
492                        .map_err(process_sqlite_error)?;
493                    let mut grants = Vec::new();
494                    for row in rows {
495                        let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
496                        let descriptor: ProcessHandleDescriptor =
497                            serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
498                        grants.push(ProcessHandleGrant {
499                            session_id,
500                            process_id: process_id.clone(),
501                            descriptor,
502                        });
503                    }
504                    Ok(grants)
505                })())
506            })
507            .await
508            .map_err(process_sqlite_error)?
509    }
510
511    async fn delete_session_process_state(
512        &self,
513        session_id: &str,
514    ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
515        let session_id_owned = session_id.to_string();
516        let (
517            revoked_handle_count,
518            deleted_wake_count,
519            mut orphaned_process_ids,
520            mut preserved_process_ids,
521        ) = self
522            .conn
523            .write_flow(move |tx| {
524                Ok(tx_outcome((|| {
525                    let session_id = session_id_owned;
526                    let removed = {
527                        let mut stmt = tx
528                            .prepare(
529                                "SELECT g.process_id, p.record_json
530                                 FROM process_handle_grants g
531                                 JOIN processes p ON p.process_id = g.process_id
532                                 WHERE g.session_id = ?1
533                                 ORDER BY g.process_id ASC",
534                            )
535                            .map_err(process_sqlite_error)?;
536                        let rows = stmt
537                            .query_map(params![session_id], |row| {
538                                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
539                            })
540                            .map_err(process_sqlite_error)?;
541                        let mut removed = Vec::new();
542                        for row in rows {
543                            let (process_id, record_json) = row.map_err(process_sqlite_error)?;
544                            let record: ProcessRecord =
545                                serde_json::from_str(&record_json).map_err(process_decode_error)?;
546                            removed.push((process_id, record));
547                        }
548                        removed
549                    };
550
551                    // Wake acknowledgements are process-scoped consumed-event markers.
552                    // Session deletion removes materialized session-addressed deliveries
553                    // through the session store; clearing these rows would re-expose
554                    // already-consumed wakes to surviving grants or future host readers.
555                    let deleted_wake_count = 0;
556                    let revoked_handle_count = tx
557                        .execute(
558                            "DELETE FROM process_handle_grants WHERE session_id = ?1",
559                            params![session_id],
560                        )
561                        .map_err(process_sqlite_error)?;
562                    let mut orphaned_process_ids = Vec::new();
563                    let mut preserved_process_ids = Vec::new();
564                    for (process_id, record) in removed {
565                        if record.is_terminal() {
566                            continue;
567                        }
568                        let remaining_grants: i64 = tx
569                            .query_row(
570                                "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
571                                params![process_id],
572                                |row| row.get(0),
573                            )
574                            .map_err(process_sqlite_error)?;
575                        if remaining_grants == 0 {
576                            orphaned_process_ids.push(process_id);
577                        } else {
578                            preserved_process_ids.push(process_id);
579                        }
580                    }
581                    Ok((
582                        revoked_handle_count,
583                        deleted_wake_count,
584                        orphaned_process_ids,
585                        preserved_process_ids,
586                    ))
587                })()))
588            })
589            .await
590            .map_err(process_sqlite_error)??;
591        orphaned_process_ids.sort();
592        orphaned_process_ids.dedup();
593        preserved_process_ids.sort();
594        preserved_process_ids.dedup();
595        Ok(lash_core::ProcessSessionDeleteReport {
596            session_id: session_id.to_string(),
597            revoked_handle_count,
598            deleted_wake_count,
599            orphaned_process_ids,
600            preserved_process_ids,
601        })
602    }
603
604    async fn append_event(
605        &self,
606        process_id: &str,
607        request: ProcessEventAppendRequest,
608    ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
609        let process_id = process_id.to_string();
610        let (result, appended) = self
611            .conn
612            .write_flow(move |tx| {
613                Ok(tx_outcome((|| {
614                    let mut record =
615                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
616                            lash_core::PluginError::Session(format!(
617                                "unknown process `{process_id}`"
618                            ))
619                        })?;
620                    let replay_lookup = if let Some(replay_key) =
621                        request.replay.as_ref().map(|replay| replay.key.as_str())
622                    {
623                        Self::load_event_by_key_conn(tx, &process_id, replay_key)?
624                    } else {
625                        None
626                    };
627                    let sequence = tx
628                        .query_row(
629                            "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
630                            params![process_id],
631                            |row| row.get::<_, i64>(0),
632                        )
633                        .map_err(process_sqlite_error)? as u64;
634                    let occurred_at_ms = current_epoch_ms();
635                    let prepared = prepare_process_event_append(
636                        &record,
637                        request,
638                        sequence,
639                        replay_lookup,
640                        occurred_at_ms,
641                    )?;
642                    if prepared.replayed {
643                        return Ok((
644                            ProcessEventAppendResult {
645                                event: prepared.event,
646                                wake_delivery: prepared.wake_delivery,
647                            },
648                            false,
649                        ));
650                    }
651                    let event = prepared.event;
652                    tx.execute(
653                        "INSERT INTO process_events (
654                            process_id, sequence, event_type, payload_hash, idempotency_key,
655                            occurred_at_ms, event_json
656                         )
657                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
658                        params![
659                            process_id,
660                            sequence as i64,
661                            event.event_type.as_str(),
662                            prepared.payload_hash.as_str(),
663                            event.invocation.replay_key(),
664                            prepared.occurred_at_ms as i64,
665                            process_encode_json(&event)?,
666                        ],
667                    )
668                    .map_err(process_sqlite_error)?;
669                    if let Some(status) = prepared.status_update.clone() {
670                        record.status = status;
671                        if record.status.is_terminal() {
672                            record.wait = None;
673                        }
674                    }
675                    record.updated_at_ms = prepared.occurred_at_ms;
676                    Self::save_process_conn(tx, &record)?;
677                    Ok((
678                        ProcessEventAppendResult {
679                            event,
680                            wake_delivery: prepared.wake_delivery,
681                        },
682                        true,
683                    ))
684                })()))
685            })
686            .await
687            .map_err(process_sqlite_error)??;
688        if appended {
689            self.notify.notify_waiters();
690        }
691        Ok(result)
692    }
693
694    async fn events_after(
695        &self,
696        process_id: &str,
697        after_sequence: u64,
698    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
699        let process_id = process_id.to_string();
700        self.conn
701            .call(move |conn| {
702                Ok((|| {
703                    if Self::load_process_conn(conn, &process_id)?.is_none() {
704                        return Err(lash_core::PluginError::Session(format!(
705                            "unknown process `{process_id}`"
706                        )));
707                    }
708                    let mut stmt = conn
709                        .prepare(
710                            "SELECT event_json FROM process_events
711                             WHERE process_id = ?1 AND sequence > ?2
712                             ORDER BY sequence ASC",
713                        )
714                        .map_err(process_sqlite_error)?;
715                    let rows = stmt
716                        .query_map(params![process_id, after_sequence as i64], |row| {
717                            row.get::<_, String>(0)
718                        })
719                        .map_err(process_sqlite_error)?;
720                    let mut events = Vec::new();
721                    for row in rows {
722                        events.push(
723                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
724                                .map_err(process_decode_error)?,
725                        );
726                    }
727                    Ok(events)
728                })())
729            })
730            .await
731            .map_err(process_sqlite_error)?
732    }
733
734    async fn wake_events_after(
735        &self,
736        process_id: &str,
737        after_sequence: u64,
738    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
739        let acked: std::collections::HashSet<u64> = {
740            let process_id = process_id.to_string();
741            self.conn
742                .call(move |conn| {
743                    Ok(
744                        (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
745                            let mut stmt = conn
746                                .prepare(
747                                    "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
748                                )
749                                .map_err(process_sqlite_error)?;
750                            let rows = stmt
751                                .query_map(params![process_id], |row| row.get::<_, i64>(0))
752                                .map_err(process_sqlite_error)?;
753                            let mut set = std::collections::HashSet::new();
754                            for row in rows {
755                                set.insert(row.map_err(process_sqlite_error)? as u64);
756                            }
757                            Ok(set)
758                        })(),
759                    )
760                })
761                .await
762                .map_err(process_sqlite_error)??
763        };
764        Ok(self
765            .events_after(process_id, after_sequence)
766            .await?
767            .into_iter()
768            .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
769            .collect())
770    }
771
772    async fn wait_event_after(
773        &self,
774        process_id: &str,
775        event_type: &str,
776        after_sequence: u64,
777    ) -> Result<ProcessEvent, lash_core::PluginError> {
778        loop {
779            if let Some(event) = self
780                .events_after(process_id, after_sequence)
781                .await?
782                .into_iter()
783                .find(|event| event.event_type == event_type)
784            {
785                return Ok(event);
786            }
787            tokio::select! {
788                _ = self.notify.notified() => {}
789                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
790            }
791        }
792    }
793
794    async fn await_process(
795        &self,
796        process_id: &str,
797    ) -> Result<ProcessAwaitOutput, lash_core::PluginError> {
798        loop {
799            let record = self.get_process(process_id).await.ok_or_else(|| {
800                lash_core::PluginError::Session(format!("unknown process `{process_id}`"))
801            })?;
802            if let Some(await_output) = record.status.await_output() {
803                return Ok(await_output.clone());
804            }
805            tokio::select! {
806                _ = self.notify.notified() => {}
807                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
808            }
809        }
810    }
811
812    async fn complete_process(
813        &self,
814        process_id: &str,
815        await_output: ProcessAwaitOutput,
816    ) -> Result<ProcessRecord, lash_core::PluginError> {
817        let event_type = match await_output.terminal_state() {
818            lash_core::ProcessTerminalState::Completed => "process.completed",
819            lash_core::ProcessTerminalState::Failed => "process.failed",
820            lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
821        };
822        self.append_event(
823            process_id,
824            ProcessEventAppendRequest::new(
825                event_type,
826                serde_json::json!({ "await_output": await_output }),
827            )
828            .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
829        )
830        .await?;
831        self.get_process(process_id).await.ok_or_else(|| {
832            lash_core::PluginError::Session(format!(
833                "unknown process `{process_id}` after terminal event"
834            ))
835        })
836    }
837
838    async fn set_process_wait(
839        &self,
840        process_id: &str,
841        wait: lash_core::WaitState,
842    ) -> Result<ProcessRecord, lash_core::PluginError> {
843        let process_id = process_id.to_string();
844        self.conn
845            .write_flow(move |tx| {
846                Ok(tx_outcome((|| {
847                    let mut record =
848                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
849                            lash_core::PluginError::Session(format!(
850                                "unknown process `{process_id}`"
851                            ))
852                        })?;
853                    if record.is_terminal() {
854                        return Err(lash_core::PluginError::Session(format!(
855                            "terminal process `{process_id}` cannot enter a wait state"
856                        )));
857                    }
858                    record.wait = Some(wait);
859                    record.updated_at_ms = current_epoch_ms();
860                    Self::save_process_conn(tx, &record)?;
861                    Ok(record)
862                })()))
863            })
864            .await
865            .map_err(process_sqlite_error)?
866    }
867
868    async fn clear_process_wait(
869        &self,
870        process_id: &str,
871    ) -> Result<ProcessRecord, lash_core::PluginError> {
872        let process_id = process_id.to_string();
873        self.conn
874            .write_flow(move |tx| {
875                Ok(tx_outcome((|| {
876                    let mut record =
877                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
878                            lash_core::PluginError::Session(format!(
879                                "unknown process `{process_id}`"
880                            ))
881                        })?;
882                    record.wait = None;
883                    record.updated_at_ms = current_epoch_ms();
884                    Self::save_process_conn(tx, &record)?;
885                    Ok(record)
886                })()))
887            })
888            .await
889            .map_err(process_sqlite_error)?
890    }
891
892    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
893        let process_id = process_id.to_string();
894        self.conn
895            .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
896            .await
897            .ok()
898            .flatten()
899    }
900
901    async fn list_processes(
902        &self,
903        filter: &lash_core::ProcessListFilter,
904    ) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
905        let filter = filter.clone();
906        self.conn
907            .call(move |conn| {
908                Ok((|| {
909                    let mut stmt = conn
910                        .prepare(
911                            "SELECT record_json FROM processes
912                             ORDER BY process_id ASC",
913                        )
914                        .map_err(process_sqlite_error)?;
915                    let rows = stmt
916                        .query_map([], |row| row.get::<_, String>(0))
917                        .map_err(process_sqlite_error)?;
918                    let mut records = Vec::new();
919                    for row in rows {
920                        let record: ProcessRecord =
921                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
922                                .map_err(process_decode_error)?;
923                        if filter.matches_record(&record) {
924                            records.push(record);
925                        }
926                    }
927                    Ok(records)
928                })())
929            })
930            .await
931            .map_err(process_sqlite_error)?
932    }
933
934    async fn ack_wake(
935        &self,
936        process_id: &str,
937        sequence: u64,
938    ) -> Result<(), lash_core::PluginError> {
939        let process_id = process_id.to_string();
940        self.conn
941            .call(move |conn| {
942                Ok((|| {
943                    if Self::load_process_conn(conn, &process_id)?.is_none() {
944                        return Err(lash_core::PluginError::Session(format!(
945                            "unknown process `{process_id}`"
946                        )));
947                    }
948                    conn.execute(
949                        "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
950                        params![process_id, sequence as i64],
951                    )
952                    .map_err(process_sqlite_error)?;
953                    Ok(())
954                })())
955            })
956            .await
957            .map_err(process_sqlite_error)?
958    }
959
960    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
961        self.conn
962            .call(move |conn| {
963                Ok((|| {
964                    let mut stmt = conn
965                        .prepare(
966                            "SELECT record_json FROM processes
967                             WHERE status = 'running'
968                             ORDER BY process_id ASC",
969                        )
970                        .map_err(process_sqlite_error)?;
971                    let rows = stmt
972                        .query_map([], |row| row.get::<_, String>(0))
973                        .map_err(process_sqlite_error)?;
974                    let mut records = Vec::new();
975                    for row in rows {
976                        let record: ProcessRecord =
977                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
978                                .map_err(process_decode_error)?;
979                        records.push(record);
980                    }
981                    Ok(records)
982                })())
983            })
984            .await
985            .map_err(process_sqlite_error)?
986    }
987
988    async fn claim_process_lease(
989        &self,
990        process_id: &str,
991        owner_id: &str,
992        lease_ttl_ms: u64,
993    ) -> Result<ProcessLease, lash_core::PluginError> {
994        let process_id = process_id.to_string();
995        let owner_id = owner_id.to_string();
996        self.conn
997            .write_flow(move |tx| {
998                Ok(tx_outcome((|| {
999                    if Self::load_process_conn(tx, &process_id)?.is_none() {
1000                        return Err(lash_core::PluginError::Session(format!(
1001                            "unknown process `{process_id}`"
1002                        )));
1003                    }
1004                    let now = current_epoch_ms();
1005                    let current = Self::load_process_lease_conn(tx, &process_id)?;
1006                    if let Some(current) = current.as_ref()
1007                        && current.expires_at_epoch_ms > now
1008                        && current.owner_id != owner_id
1009                    {
1010                        return Err(process_lease_conflict(&process_id, current));
1011                    }
1012                    // Read the raw fencing token directly: a completed/abandoned
1013                    // lease nulls the owner/token columns but retains the
1014                    // monotonically-increasing `lease_fencing_token`, so a
1015                    // re-claim never reuses a stale writer's token.
1016                    let fencing_token: u64 = tx
1017                        .query_row(
1018                            "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1019                            params![process_id],
1020                            |row| row.get::<_, i64>(0),
1021                        )
1022                        .optional()
1023                        .map_err(process_sqlite_error)?
1024                        .unwrap_or(0) as u64
1025                        + 1;
1026                    let lease = ProcessLease {
1027                        schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1028                        process_id: process_id.clone(),
1029                        owner_id: owner_id.clone(),
1030                        lease_token: format!(
1031                            "{:x}",
1032                            Sha256::digest(
1033                                format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes()
1034                            )
1035                        ),
1036                        fencing_token,
1037                        claimed_at_epoch_ms: now,
1038                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1039                    };
1040                    tx.execute(
1041                        "INSERT INTO process_leases (
1042                            process_id, lease_owner_id, lease_token, lease_fencing_token,
1043                            lease_claimed_at_ms, lease_expires_at_ms
1044                         )
1045                         VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1046                         ON CONFLICT(process_id) DO UPDATE SET
1047                            lease_owner_id = excluded.lease_owner_id,
1048                            lease_token = excluded.lease_token,
1049                            lease_fencing_token = excluded.lease_fencing_token,
1050                            lease_claimed_at_ms = excluded.lease_claimed_at_ms,
1051                            lease_expires_at_ms = excluded.lease_expires_at_ms",
1052                        params![
1053                            lease.process_id.as_str(),
1054                            lease.owner_id.as_str(),
1055                            lease.lease_token.as_str(),
1056                            lease.fencing_token as i64,
1057                            lease.claimed_at_epoch_ms as i64,
1058                            lease.expires_at_epoch_ms as i64,
1059                        ],
1060                    )
1061                    .map_err(process_sqlite_error)?;
1062                    Ok(lease)
1063                })()))
1064            })
1065            .await
1066            .map_err(process_sqlite_error)?
1067    }
1068
1069    async fn renew_process_lease(
1070        &self,
1071        lease: &ProcessLease,
1072        lease_ttl_ms: u64,
1073    ) -> Result<ProcessLease, lash_core::PluginError> {
1074        let lease = lease.clone();
1075        self.conn
1076            .write_flow(move |tx| {
1077                Ok(tx_outcome((|| {
1078                    let now = current_epoch_ms();
1079                    let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
1080                    if !guard_lease(current.as_ref(), &lease.lease_token, now) {
1081                        return Err(process_lease_expired(&lease.process_id));
1082                    }
1083                    let renewed = ProcessLease {
1084                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1085                        ..lease.clone()
1086                    };
1087                    tx.execute(
1088                        "UPDATE process_leases
1089                         SET lease_expires_at_ms = ?2
1090                         WHERE process_id = ?1 AND lease_token = ?3",
1091                        params![
1092                            renewed.process_id.as_str(),
1093                            renewed.expires_at_epoch_ms as i64,
1094                            renewed.lease_token.as_str(),
1095                        ],
1096                    )
1097                    .map_err(process_sqlite_error)?;
1098                    Ok(renewed)
1099                })()))
1100            })
1101            .await
1102            .map_err(process_sqlite_error)?
1103    }
1104
1105    async fn complete_process_lease(
1106        &self,
1107        completion: &ProcessLeaseCompletion,
1108    ) -> Result<(), lash_core::PluginError> {
1109        let process_id = completion.process_id.clone();
1110        let lease_token = completion.lease_token.clone();
1111        self.conn
1112            .call(move |conn| {
1113                conn.execute(
1114                    "UPDATE process_leases
1115                     SET lease_owner_id = NULL,
1116                         lease_token = NULL,
1117                         lease_claimed_at_ms = 0,
1118                         lease_expires_at_ms = 0
1119                     WHERE process_id = ?1 AND lease_token = ?2",
1120                    params![process_id, lease_token],
1121                )
1122            })
1123            .await
1124            .map_err(process_sqlite_error)?;
1125        Ok(())
1126    }
1127}
1128
1129/// Loud, stable error for a fenced process-lease claim on the `PluginError`
1130/// channel the [`ProcessRegistry`] trait returns.
1131fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> lash_core::PluginError {
1132    lash_core::PluginError::Session(format!(
1133        "process `{process_id}` is already leased by `{}` until {}",
1134        current.owner_id, current.expires_at_epoch_ms
1135    ))
1136}
1137
1138/// Loud, stable error for a superseded or expired process lease.
1139fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1140    lash_core::PluginError::Session(format!(
1141        "process lease for `{process_id}` is missing or expired"
1142    ))
1143}