Skip to main content

lash_sqlite_store/
process_registry.rs

1//! SQLite-backed [`ProcessRegistry`] (`SqliteProcessRegistry`).
2//!
3//! Ported from the prior store, preserving the public async surface byte-for-byte
4//! (the type name keeps the `Sqlite` prefix so the path-rename swap keeps
5//! compiling). The prior implementation ran every op directly on `&rusqlite::Connection`
6//! with `.await`; here every DB body is a *synchronous* rusqlite closure handed
7//! to [`SqliteConnection::call`] (reads) or [`SqliteConnection::write_flow`]
8//! (read-then-write).
9//!
10//! ## Why `write_flow`, not `write`
11//!
12//! The registry's transactional methods produce a [`lash_core::PluginError`],
13//! not a `rusqlite::Error`. `SqliteConnection::write` rolls back only when the
14//! closure returns `Err(rusqlite::Error)`, so a logical `PluginError` (e.g. a
15//! registration-hash conflict) would otherwise *commit* the partial work. Each
16//! such method therefore runs its synchronous body returning
17//! `Result<T, PluginError>` and maps it to a [`TxOutcome`]: `Ok` ⇒
18//! `Commit(Ok(value))`, `Err` ⇒ `Rollback(Err(error))`. That preserves the
19//! prior behaviour of rolling back on every error while still carrying the
20//! `PluginError` back to the caller. The outer `rusqlite::Error` channel only
21//! carries genuine SQLite/connection failures, mapped via `process_sqlite_error`.
22//!
23//! The `*_conn` helpers are synchronous and take a `&rusqlite::Connection` so
24//! they compose inside either closure — including from within a `&Transaction`,
25//! which derefs to `&Connection`.
26
27use super::*;
28
29fn process_status_label(record: &ProcessRecord) -> &'static str {
30    record.status.label()
31}
32
33impl SqliteProcessRegistry {
34    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
35        let conn = SqliteConnection::open(path).await?;
36        ensure_process_schema(&conn).await?;
37        apply_pragmas(&conn, StoreBacking::File).await?;
38        Ok(Self {
39            conn,
40            notify: tokio::sync::Notify::new(),
41        })
42    }
43
44    pub async fn memory() -> tokio_rusqlite::Result<Self> {
45        let conn = SqliteConnection::open_in_memory().await?;
46        ensure_process_schema(&conn).await?;
47        apply_pragmas(&conn, StoreBacking::Memory).await?;
48        Ok(Self {
49            conn,
50            notify: tokio::sync::Notify::new(),
51        })
52    }
53
54    fn load_process_conn(
55        conn: &Connection,
56        process_id: &str,
57    ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
58        let json: Option<String> = conn
59            .query_row(
60                "SELECT record_json FROM processes WHERE process_id = ?1",
61                params![process_id],
62                |row| row.get(0),
63            )
64            .optional()
65            .map_err(process_sqlite_error)?;
66        json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
67            .transpose()
68    }
69
70    fn save_process_conn(
71        conn: &Connection,
72        record: &ProcessRecord,
73    ) -> Result<(), lash_core::PluginError> {
74        conn.execute(
75            "UPDATE processes
76             SET updated_at_ms = ?2, status = ?3, record_json = ?4
77             WHERE process_id = ?1",
78            params![
79                record.id.as_str(),
80                record.updated_at_ms as i64,
81                process_status_label(record),
82                process_encode_json(record)?
83            ],
84        )
85        .map_err(process_sqlite_error)?;
86        Ok(())
87    }
88
89    fn load_event_by_key_conn(
90        conn: &Connection,
91        process_id: &str,
92        replay_key: &str,
93    ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
94        let row: Option<(String, String)> = conn
95            .query_row(
96                "SELECT payload_hash, event_json
97                 FROM process_events
98                 WHERE process_id = ?1 AND idempotency_key = ?2",
99                params![process_id, replay_key],
100                |row| Ok((row.get(0)?, row.get(1)?)),
101            )
102            .optional()
103            .map_err(process_sqlite_error)?;
104        row.map(|(hash, json)| {
105            serde_json::from_str(&json)
106                .map(|event| (hash, event))
107                .map_err(process_decode_error)
108        })
109        .transpose()
110    }
111
112    fn load_process_lease_conn(
113        conn: &Connection,
114        process_id: &str,
115    ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
116        conn.query_row(
117            "SELECT lease_owner_id, lease_token, lease_fencing_token,
118                    lease_claimed_at_ms, lease_expires_at_ms
119             FROM process_leases
120             WHERE process_id = ?1",
121            params![process_id],
122            |row| {
123                let owner_id: Option<String> = row.get(0)?;
124                let lease_token: Option<String> = row.get(1)?;
125                let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
126                    return Ok(None);
127                };
128                Ok(Some(ProcessLease {
129                    schema_version: PROCESS_LEASE_SCHEMA_VERSION,
130                    process_id: process_id.to_string(),
131                    owner_id,
132                    lease_token,
133                    fencing_token: row.get::<_, i64>(2)? as u64,
134                    claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
135                    expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
136                }))
137            },
138        )
139        .optional()
140        .map(|lease| lease.flatten())
141        .map_err(process_sqlite_error)
142    }
143
144    fn list_grants_for_scope_conn(
145        conn: &Connection,
146        session_scope: &SessionScope,
147        live_only: bool,
148    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
149        let session_scope_id = session_scope.id();
150        let status_clause = if live_only {
151            "AND p.status = 'running'"
152        } else {
153            ""
154        };
155        let mut stmt = conn
156            .prepare(&format!(
157                "SELECT g.process_id, g.descriptor_json, p.record_json
158                 FROM process_handle_grants g
159                 JOIN processes p ON p.process_id = g.process_id
160                 WHERE g.scope_id = ?1 {status_clause}
161                 ORDER BY g.process_id ASC"
162            ))
163            .map_err(process_sqlite_error)?;
164        let rows = stmt
165            .query_map(params![session_scope_id.as_str()], |row| {
166                Ok((
167                    row.get::<_, String>(0)?,
168                    row.get::<_, String>(1)?,
169                    row.get::<_, String>(2)?,
170                ))
171            })
172            .map_err(process_sqlite_error)?;
173        let mut entries = Vec::new();
174        for row in rows {
175            let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
176            let descriptor: ProcessHandleDescriptor =
177                serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
178            let record: ProcessRecord =
179                serde_json::from_str(&record_json).map_err(process_decode_error)?;
180            entries.push((
181                ProcessHandleGrant {
182                    session_id: session_scope.session_id.clone(),
183                    process_id,
184                    descriptor,
185                },
186                record,
187            ));
188        }
189        Ok(entries)
190    }
191}
192
193/// Map a `Result<T, PluginError>` produced by a synchronous transaction body to
194/// a [`TxOutcome`]: commit on success, roll back on logical error. Both arms
195/// carry the inner `Result` back so the caller recovers the value or the
196/// `PluginError` after the transaction resolves.
197fn tx_outcome<T>(
198    result: Result<T, lash_core::PluginError>,
199) -> TxOutcome<Result<T, lash_core::PluginError>> {
200    match result {
201        Ok(value) => TxOutcome::Commit(Ok(value)),
202        Err(err) => TxOutcome::Rollback(Err(err)),
203    }
204}
205
206#[async_trait::async_trait]
207impl ProcessRegistry for SqliteProcessRegistry {
208    fn durability_tier(&self) -> DurabilityTier {
209        DurabilityTier::Durable
210    }
211
212    async fn register_process(
213        &self,
214        registration: ProcessRegistration,
215    ) -> Result<ProcessRecord, lash_core::PluginError> {
216        let (registration, registration_hash) = prepare_process_registration(registration)?;
217        let record = self
218            .conn
219            .write_flow(move |tx| {
220                Ok(tx_outcome((|| {
221                    if let Some(existing) = Self::load_process_conn(tx, &registration.id)? {
222                        if existing.registration_hash == registration_hash {
223                            return Ok(existing);
224                        }
225                        return Err(lash_core::PluginError::Session(format!(
226                            "process `{}` registration hash conflict: existing {}, new {}",
227                            registration.id, existing.registration_hash, registration_hash
228                        )));
229                    }
230                    let now = current_epoch_ms();
231                    let record = ProcessRecord::from_prepared_registration(
232                        registration,
233                        registration_hash,
234                        now,
235                    );
236                    let originator_scope_id = record.originator_scope_id();
237                    tx.execute(
238                        "INSERT INTO processes (
239                            process_id, registration_hash, owner_scope_id, host_profile_id,
240                            created_at_ms, updated_at_ms, status, record_json
241                         )
242                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
243                        params![
244                            record.id.as_str(),
245                            record.registration_hash.as_str(),
246                            originator_scope_id.as_str(),
247                            record.host_profile_id(),
248                            record.created_at_ms as i64,
249                            record.updated_at_ms as i64,
250                            process_status_label(&record),
251                            process_encode_json(&record)?,
252                        ],
253                    )
254                    .map_err(process_sqlite_error)?;
255                    Ok(record)
256                })()))
257            })
258            .await
259            .map_err(process_sqlite_error)??;
260        self.notify.notify_waiters();
261        Ok(record)
262    }
263
264    async fn set_external_ref(
265        &self,
266        process_id: &str,
267        external_ref: ProcessExternalRef,
268    ) -> Result<ProcessRecord, lash_core::PluginError> {
269        let process_id = process_id.to_string();
270        let record = self
271            .conn
272            .write_flow(move |tx| {
273                Ok(tx_outcome((|| {
274                    let mut record =
275                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
276                            lash_core::PluginError::Session(format!(
277                                "unknown process `{process_id}`"
278                            ))
279                        })?;
280                    record.external_ref = Some(external_ref);
281                    record.updated_at_ms = current_epoch_ms();
282                    Self::save_process_conn(tx, &record)?;
283                    Ok(record)
284                })()))
285            })
286            .await
287            .map_err(process_sqlite_error)??;
288        self.notify.notify_waiters();
289        Ok(record)
290    }
291
292    async fn grant_handle(
293        &self,
294        session_scope: &SessionScope,
295        process_id: &str,
296        descriptor: ProcessHandleDescriptor,
297    ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
298        let session_scope = session_scope.clone();
299        let process_id = process_id.to_string();
300        self.conn
301            .write_flow(move |tx| {
302                Ok(tx_outcome((|| {
303                    let session_scope_id = session_scope.id();
304                    if Self::load_process_conn(tx, &process_id)?.is_none() {
305                        return Err(lash_core::PluginError::Session(format!(
306                            "unknown process `{process_id}`"
307                        )));
308                    }
309                    tx.execute(
310                        "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
311                         VALUES (?1, ?2, ?3, ?4)
312                         ON CONFLICT(scope_id, process_id) DO UPDATE SET
313                            session_id = excluded.session_id,
314                            descriptor_json = excluded.descriptor_json",
315                        params![
316                            session_scope.session_id.as_str(),
317                            session_scope_id.as_str(),
318                            process_id.as_str(),
319                            process_encode_json(&descriptor)?
320                        ],
321                    )
322                    .map_err(process_sqlite_error)?;
323                    Ok(ProcessHandleGrant {
324                        session_id: session_scope.session_id.clone(),
325                        process_id: process_id.clone(),
326                        descriptor,
327                    })
328                })()))
329            })
330            .await
331            .map_err(process_sqlite_error)?
332    }
333
334    async fn revoke_handle(
335        &self,
336        session_scope: &SessionScope,
337        process_id: &str,
338    ) -> Result<(), lash_core::PluginError> {
339        let session_scope_id = session_scope.id().as_str().to_string();
340        let process_id = process_id.to_string();
341        self.conn
342            .call(move |conn| {
343                conn.execute(
344                    "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
345                    params![session_scope_id, process_id],
346                )
347            })
348            .await
349            .map_err(process_sqlite_error)?;
350        Ok(())
351    }
352
353    async fn transfer_handle_grants(
354        &self,
355        from_scope: &SessionScope,
356        to_scope: &SessionScope,
357        process_ids: &[String],
358    ) -> Result<(), lash_core::PluginError> {
359        let from_scope = from_scope.clone();
360        let to_scope = to_scope.clone();
361        let process_ids = process_ids.to_vec();
362        self.conn
363            .write_flow(move |tx| {
364                Ok(tx_outcome((|| {
365                    let from_scope_id = from_scope.id();
366                    let to_scope_id = to_scope.id();
367                    for process_id in &process_ids {
368                        let descriptor_json: Option<String> = tx
369                            .query_row(
370                                "SELECT descriptor_json
371                                 FROM process_handle_grants
372                                 WHERE scope_id = ?1 AND process_id = ?2",
373                                params![from_scope_id.as_str(), process_id.as_str()],
374                                |row| row.get(0),
375                            )
376                            .optional()
377                            .map_err(process_sqlite_error)?;
378                        let Some(descriptor_json) = descriptor_json else {
379                            return Err(lash_core::PluginError::Session(format!(
380                                "process handle `{process_id}` is not granted to session `{}`",
381                                from_scope.session_id
382                            )));
383                        };
384                        tx.execute(
385                            "DELETE FROM process_handle_grants
386                             WHERE scope_id = ?1 AND process_id = ?2",
387                            params![from_scope_id.as_str(), process_id.as_str()],
388                        )
389                        .map_err(process_sqlite_error)?;
390                        tx.execute(
391                            "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
392                             VALUES (?1, ?2, ?3, ?4)
393                             ON CONFLICT(scope_id, process_id) DO UPDATE SET
394                                session_id = excluded.session_id,
395                                descriptor_json = excluded.descriptor_json",
396                            params![
397                                to_scope.session_id.as_str(),
398                                to_scope_id.as_str(),
399                                process_id.as_str(),
400                                descriptor_json
401                            ],
402                        )
403                        .map_err(process_sqlite_error)?;
404                    }
405                    Ok(())
406                })()))
407            })
408            .await
409            .map_err(process_sqlite_error)?
410    }
411
412    async fn list_handle_grants(
413        &self,
414        session_scope: &SessionScope,
415    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
416        let session_scope = session_scope.clone();
417        self.conn
418            .call(move |conn| {
419                Ok(Self::list_grants_for_scope_conn(
420                    conn,
421                    &session_scope,
422                    false,
423                ))
424            })
425            .await
426            .map_err(process_sqlite_error)?
427    }
428
429    async fn list_live_handle_grants(
430        &self,
431        session_scope: &SessionScope,
432    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
433        let session_scope = session_scope.clone();
434        self.conn
435            .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &session_scope, true)))
436            .await
437            .map_err(process_sqlite_error)?
438    }
439
440    async fn has_handle_grant(
441        &self,
442        session_scope: &SessionScope,
443        process_id: &str,
444    ) -> Result<bool, lash_core::PluginError> {
445        let session_scope_id = session_scope.id().as_str().to_string();
446        let process_id = process_id.to_string();
447        self.conn
448            .call(move |conn| {
449                let exists = conn
450                    .query_row(
451                        "SELECT 1
452                         FROM process_handle_grants g
453                         JOIN processes p ON p.process_id = g.process_id
454                         WHERE g.scope_id = ?1 AND g.process_id = ?2
455                         LIMIT 1",
456                        params![session_scope_id, process_id],
457                        |_| Ok(()),
458                    )
459                    .optional()?
460                    .is_some();
461                Ok(exists)
462            })
463            .await
464            .map_err(process_sqlite_error)
465    }
466
467    async fn handle_grants_for_process(
468        &self,
469        process_id: &str,
470    ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
471        let process_id = process_id.to_string();
472        self.conn
473            .call(move |conn| {
474                Ok((|| {
475                    if Self::load_process_conn(conn, &process_id)?.is_none() {
476                        return Err(lash_core::PluginError::Session(format!(
477                            "unknown process `{process_id}`"
478                        )));
479                    }
480                    let mut stmt = conn
481                        .prepare(
482                            "SELECT session_id, descriptor_json
483                             FROM process_handle_grants
484                             WHERE process_id = ?1
485                             ORDER BY session_id ASC, scope_id ASC",
486                        )
487                        .map_err(process_sqlite_error)?;
488                    let rows = stmt
489                        .query_map(params![process_id], |row| {
490                            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
491                        })
492                        .map_err(process_sqlite_error)?;
493                    let mut grants = Vec::new();
494                    for row in rows {
495                        let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
496                        let descriptor: ProcessHandleDescriptor =
497                            serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
498                        grants.push(ProcessHandleGrant {
499                            session_id,
500                            process_id: process_id.clone(),
501                            descriptor,
502                        });
503                    }
504                    Ok(grants)
505                })())
506            })
507            .await
508            .map_err(process_sqlite_error)?
509    }
510
511    async fn delete_session_process_state(
512        &self,
513        session_id: &str,
514    ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
515        let session_id_owned = session_id.to_string();
516        let (
517            revoked_handle_count,
518            deleted_wake_count,
519            mut orphaned_process_ids,
520            mut preserved_process_ids,
521        ) = self
522            .conn
523            .write_flow(move |tx| {
524                Ok(tx_outcome((|| {
525                    let session_id = session_id_owned;
526                    let removed = {
527                        let mut stmt = tx
528                            .prepare(
529                                "SELECT g.process_id, p.record_json
530                                 FROM process_handle_grants g
531                                 JOIN processes p ON p.process_id = g.process_id
532                                 WHERE g.session_id = ?1
533                                 ORDER BY g.process_id ASC",
534                            )
535                            .map_err(process_sqlite_error)?;
536                        let rows = stmt
537                            .query_map(params![session_id], |row| {
538                                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
539                            })
540                            .map_err(process_sqlite_error)?;
541                        let mut removed = Vec::new();
542                        for row in rows {
543                            let (process_id, record_json) = row.map_err(process_sqlite_error)?;
544                            let record: ProcessRecord =
545                                serde_json::from_str(&record_json).map_err(process_decode_error)?;
546                            removed.push((process_id, record));
547                        }
548                        removed
549                    };
550
551                    // Wake acknowledgements are process-scoped consumed-event markers.
552                    // Session deletion removes materialized session-addressed deliveries
553                    // through the session store; clearing these rows would re-expose
554                    // already-consumed wakes to surviving grants or future host readers.
555                    let deleted_wake_count = 0;
556                    let revoked_handle_count = tx
557                        .execute(
558                            "DELETE FROM process_handle_grants WHERE session_id = ?1",
559                            params![session_id],
560                        )
561                        .map_err(process_sqlite_error)?;
562                    let mut orphaned_process_ids = Vec::new();
563                    let mut preserved_process_ids = Vec::new();
564                    for (process_id, record) in removed {
565                        if record.is_terminal() {
566                            continue;
567                        }
568                        let remaining_grants: i64 = tx
569                            .query_row(
570                                "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
571                                params![process_id],
572                                |row| row.get(0),
573                            )
574                            .map_err(process_sqlite_error)?;
575                        if remaining_grants == 0 {
576                            orphaned_process_ids.push(process_id);
577                        } else {
578                            preserved_process_ids.push(process_id);
579                        }
580                    }
581                    Ok((
582                        revoked_handle_count,
583                        deleted_wake_count,
584                        orphaned_process_ids,
585                        preserved_process_ids,
586                    ))
587                })()))
588            })
589            .await
590            .map_err(process_sqlite_error)??;
591        orphaned_process_ids.sort();
592        orphaned_process_ids.dedup();
593        preserved_process_ids.sort();
594        preserved_process_ids.dedup();
595        Ok(lash_core::ProcessSessionDeleteReport {
596            session_id: session_id.to_string(),
597            revoked_handle_count,
598            deleted_wake_count,
599            orphaned_process_ids,
600            preserved_process_ids,
601        })
602    }
603
604    async fn append_event(
605        &self,
606        process_id: &str,
607        request: ProcessEventAppendRequest,
608    ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
609        let process_id = process_id.to_string();
610        let (result, appended) = self
611            .conn
612            .write_flow(move |tx| {
613                Ok(tx_outcome((|| {
614                    let mut record =
615                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
616                            lash_core::PluginError::Session(format!(
617                                "unknown process `{process_id}`"
618                            ))
619                        })?;
620                    let replay_lookup = if let Some(replay_key) =
621                        request.replay.as_ref().map(|replay| replay.key.as_str())
622                    {
623                        Self::load_event_by_key_conn(tx, &process_id, replay_key)?
624                    } else {
625                        None
626                    };
627                    let sequence = tx
628                        .query_row(
629                            "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
630                            params![process_id],
631                            |row| row.get::<_, i64>(0),
632                        )
633                        .map_err(process_sqlite_error)? as u64;
634                    let occurred_at_ms = current_epoch_ms();
635                    let prepared = prepare_process_event_append(
636                        &record,
637                        request,
638                        sequence,
639                        replay_lookup,
640                        occurred_at_ms,
641                    )?;
642                    if prepared.replayed {
643                        return Ok((
644                            ProcessEventAppendResult {
645                                event: prepared.event,
646                                wake_delivery: prepared.wake_delivery,
647                            },
648                            false,
649                        ));
650                    }
651                    let event = prepared.event;
652                    tx.execute(
653                        "INSERT INTO process_events (
654                            process_id, sequence, event_type, payload_hash, idempotency_key,
655                            occurred_at_ms, event_json
656                         )
657                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
658                        params![
659                            process_id,
660                            sequence as i64,
661                            event.event_type.as_str(),
662                            prepared.payload_hash.as_str(),
663                            event.invocation.replay_key(),
664                            prepared.occurred_at_ms as i64,
665                            process_encode_json(&event)?,
666                        ],
667                    )
668                    .map_err(process_sqlite_error)?;
669                    if let Some(status) = prepared.status_update.clone() {
670                        record.status = status;
671                        if record.status.is_terminal() {
672                            record.wait = None;
673                        }
674                    }
675                    record.updated_at_ms = prepared.occurred_at_ms;
676                    Self::save_process_conn(tx, &record)?;
677                    Ok((
678                        ProcessEventAppendResult {
679                            event,
680                            wake_delivery: prepared.wake_delivery,
681                        },
682                        true,
683                    ))
684                })()))
685            })
686            .await
687            .map_err(process_sqlite_error)??;
688        if appended {
689            self.notify.notify_waiters();
690        }
691        Ok(result)
692    }
693
694    async fn events_after(
695        &self,
696        process_id: &str,
697        after_sequence: u64,
698    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
699        let process_id = process_id.to_string();
700        self.conn
701            .call(move |conn| {
702                Ok((|| {
703                    if Self::load_process_conn(conn, &process_id)?.is_none() {
704                        return Err(lash_core::PluginError::Session(format!(
705                            "unknown process `{process_id}`"
706                        )));
707                    }
708                    let mut stmt = conn
709                        .prepare(
710                            "SELECT event_json FROM process_events
711                             WHERE process_id = ?1 AND sequence > ?2
712                             ORDER BY sequence ASC",
713                        )
714                        .map_err(process_sqlite_error)?;
715                    let rows = stmt
716                        .query_map(params![process_id, after_sequence as i64], |row| {
717                            row.get::<_, String>(0)
718                        })
719                        .map_err(process_sqlite_error)?;
720                    let mut events = Vec::new();
721                    for row in rows {
722                        events.push(
723                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
724                                .map_err(process_decode_error)?,
725                        );
726                    }
727                    Ok(events)
728                })())
729            })
730            .await
731            .map_err(process_sqlite_error)?
732    }
733
734    async fn count_events_through(
735        &self,
736        process_id: &str,
737        event_type: &str,
738        up_to_sequence: u64,
739    ) -> Result<u64, lash_core::PluginError> {
740        let process_id = process_id.to_string();
741        let event_type = event_type.to_string();
742        self.conn
743            .call(move |conn| {
744                Ok((|| {
745                    if Self::load_process_conn(conn, &process_id)?.is_none() {
746                        return Err(lash_core::PluginError::Session(format!(
747                            "unknown process `{process_id}`"
748                        )));
749                    }
750                    conn.query_row(
751                        "SELECT COUNT(*) FROM process_events
752                         WHERE process_id = ?1 AND event_type = ?2 AND sequence <= ?3",
753                        params![process_id, event_type, up_to_sequence as i64],
754                        |row| row.get::<_, i64>(0),
755                    )
756                    .map(|count| count as u64)
757                    .map_err(process_sqlite_error)
758                })())
759            })
760            .await
761            .map_err(process_sqlite_error)?
762    }
763
764    async fn recent_events(
765        &self,
766        process_id: &str,
767        limit: usize,
768    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
769        let process_id = process_id.to_string();
770        self.conn
771            .call(move |conn| {
772                Ok((|| {
773                    if Self::load_process_conn(conn, &process_id)?.is_none() {
774                        return Err(lash_core::PluginError::Session(format!(
775                            "unknown process `{process_id}`"
776                        )));
777                    }
778                    let mut stmt = conn
779                        .prepare(
780                            "SELECT event_json FROM process_events
781                             WHERE process_id = ?1
782                             ORDER BY sequence DESC
783                             LIMIT ?2",
784                        )
785                        .map_err(process_sqlite_error)?;
786                    let rows = stmt
787                        .query_map(params![process_id, limit as i64], |row| {
788                            row.get::<_, String>(0)
789                        })
790                        .map_err(process_sqlite_error)?;
791                    let mut events: Vec<ProcessEvent> = Vec::new();
792                    for row in rows {
793                        events.push(
794                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
795                                .map_err(process_decode_error)?,
796                        );
797                    }
798                    events.reverse();
799                    Ok(events)
800                })())
801            })
802            .await
803            .map_err(process_sqlite_error)?
804    }
805
806    async fn wake_events_after(
807        &self,
808        process_id: &str,
809        after_sequence: u64,
810    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
811        let acked: std::collections::HashSet<u64> = {
812            let process_id = process_id.to_string();
813            self.conn
814                .call(move |conn| {
815                    Ok(
816                        (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
817                            let mut stmt = conn
818                                .prepare(
819                                    "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
820                                )
821                                .map_err(process_sqlite_error)?;
822                            let rows = stmt
823                                .query_map(params![process_id], |row| row.get::<_, i64>(0))
824                                .map_err(process_sqlite_error)?;
825                            let mut set = std::collections::HashSet::new();
826                            for row in rows {
827                                set.insert(row.map_err(process_sqlite_error)? as u64);
828                            }
829                            Ok(set)
830                        })(),
831                    )
832                })
833                .await
834                .map_err(process_sqlite_error)??
835        };
836        Ok(self
837            .events_after(process_id, after_sequence)
838            .await?
839            .into_iter()
840            .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
841            .collect())
842    }
843
844    async fn wait_event_after(
845        &self,
846        process_id: &str,
847        event_type: &str,
848        after_sequence: u64,
849    ) -> Result<ProcessEvent, lash_core::PluginError> {
850        loop {
851            if let Some(event) = self
852                .events_after(process_id, after_sequence)
853                .await?
854                .into_iter()
855                .find(|event| event.event_type == event_type)
856            {
857                return Ok(event);
858            }
859            tokio::select! {
860                _ = self.notify.notified() => {}
861                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
862            }
863        }
864    }
865
866    async fn await_process(
867        &self,
868        process_id: &str,
869    ) -> Result<ProcessAwaitOutput, lash_core::PluginError> {
870        loop {
871            let record = self.get_process(process_id).await.ok_or_else(|| {
872                lash_core::PluginError::Session(format!("unknown process `{process_id}`"))
873            })?;
874            if let Some(await_output) = record.status.await_output() {
875                return Ok(await_output.clone());
876            }
877            tokio::select! {
878                _ = self.notify.notified() => {}
879                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
880            }
881        }
882    }
883
884    async fn complete_process(
885        &self,
886        process_id: &str,
887        await_output: ProcessAwaitOutput,
888    ) -> Result<ProcessRecord, lash_core::PluginError> {
889        let event_type = match await_output.terminal_state() {
890            lash_core::ProcessTerminalState::Completed => "process.completed",
891            lash_core::ProcessTerminalState::Failed => "process.failed",
892            lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
893        };
894        self.append_event(
895            process_id,
896            ProcessEventAppendRequest::new(
897                event_type,
898                serde_json::json!({ "await_output": await_output }),
899            )
900            .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
901        )
902        .await?;
903        self.get_process(process_id).await.ok_or_else(|| {
904            lash_core::PluginError::Session(format!(
905                "unknown process `{process_id}` after terminal event"
906            ))
907        })
908    }
909
910    async fn set_process_wait(
911        &self,
912        process_id: &str,
913        wait: lash_core::WaitState,
914    ) -> Result<ProcessRecord, lash_core::PluginError> {
915        let process_id = process_id.to_string();
916        self.conn
917            .write_flow(move |tx| {
918                Ok(tx_outcome((|| {
919                    let mut record =
920                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
921                            lash_core::PluginError::Session(format!(
922                                "unknown process `{process_id}`"
923                            ))
924                        })?;
925                    if record.is_terminal() {
926                        return Err(lash_core::PluginError::Session(format!(
927                            "terminal process `{process_id}` cannot enter a wait state"
928                        )));
929                    }
930                    record.wait = Some(wait);
931                    record.updated_at_ms = current_epoch_ms();
932                    Self::save_process_conn(tx, &record)?;
933                    Ok(record)
934                })()))
935            })
936            .await
937            .map_err(process_sqlite_error)?
938    }
939
940    async fn clear_process_wait(
941        &self,
942        process_id: &str,
943    ) -> Result<ProcessRecord, lash_core::PluginError> {
944        let process_id = process_id.to_string();
945        self.conn
946            .write_flow(move |tx| {
947                Ok(tx_outcome((|| {
948                    let mut record =
949                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
950                            lash_core::PluginError::Session(format!(
951                                "unknown process `{process_id}`"
952                            ))
953                        })?;
954                    record.wait = None;
955                    record.updated_at_ms = current_epoch_ms();
956                    Self::save_process_conn(tx, &record)?;
957                    Ok(record)
958                })()))
959            })
960            .await
961            .map_err(process_sqlite_error)?
962    }
963
964    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
965        let process_id = process_id.to_string();
966        self.conn
967            .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
968            .await
969            .ok()
970            .flatten()
971    }
972
973    async fn list_processes(
974        &self,
975        filter: &lash_core::ProcessListFilter,
976    ) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
977        let filter = filter.clone();
978        self.conn
979            .call(move |conn| {
980                Ok((|| {
981                    let mut stmt = conn
982                        .prepare(
983                            "SELECT record_json FROM processes
984                             ORDER BY process_id ASC",
985                        )
986                        .map_err(process_sqlite_error)?;
987                    let rows = stmt
988                        .query_map([], |row| row.get::<_, String>(0))
989                        .map_err(process_sqlite_error)?;
990                    let mut records = Vec::new();
991                    for row in rows {
992                        let record: ProcessRecord =
993                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
994                                .map_err(process_decode_error)?;
995                        if filter.matches_record(&record) {
996                            records.push(record);
997                        }
998                    }
999                    Ok(records)
1000                })())
1001            })
1002            .await
1003            .map_err(process_sqlite_error)?
1004    }
1005
1006    async fn ack_wake(
1007        &self,
1008        process_id: &str,
1009        sequence: u64,
1010    ) -> Result<(), lash_core::PluginError> {
1011        let process_id = process_id.to_string();
1012        self.conn
1013            .call(move |conn| {
1014                Ok((|| {
1015                    if Self::load_process_conn(conn, &process_id)?.is_none() {
1016                        return Err(lash_core::PluginError::Session(format!(
1017                            "unknown process `{process_id}`"
1018                        )));
1019                    }
1020                    conn.execute(
1021                        "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
1022                        params![process_id, sequence as i64],
1023                    )
1024                    .map_err(process_sqlite_error)?;
1025                    Ok(())
1026                })())
1027            })
1028            .await
1029            .map_err(process_sqlite_error)?
1030    }
1031
1032    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1033        self.conn
1034            .call(move |conn| {
1035                Ok((|| {
1036                    let mut stmt = conn
1037                        .prepare(
1038                            "SELECT record_json FROM processes
1039                             WHERE status = 'running'
1040                             ORDER BY process_id ASC",
1041                        )
1042                        .map_err(process_sqlite_error)?;
1043                    let rows = stmt
1044                        .query_map([], |row| row.get::<_, String>(0))
1045                        .map_err(process_sqlite_error)?;
1046                    let mut records = Vec::new();
1047                    for row in rows {
1048                        let record: ProcessRecord =
1049                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
1050                                .map_err(process_decode_error)?;
1051                        records.push(record);
1052                    }
1053                    Ok(records)
1054                })())
1055            })
1056            .await
1057            .map_err(process_sqlite_error)?
1058    }
1059
1060    async fn claim_process_lease(
1061        &self,
1062        process_id: &str,
1063        owner_id: &str,
1064        lease_ttl_ms: u64,
1065    ) -> Result<ProcessLease, lash_core::PluginError> {
1066        let process_id = process_id.to_string();
1067        let owner_id = owner_id.to_string();
1068        self.conn
1069            .write_flow(move |tx| {
1070                Ok(tx_outcome((|| {
1071                    if Self::load_process_conn(tx, &process_id)?.is_none() {
1072                        return Err(lash_core::PluginError::Session(format!(
1073                            "unknown process `{process_id}`"
1074                        )));
1075                    }
1076                    let now = current_epoch_ms();
1077                    let current = Self::load_process_lease_conn(tx, &process_id)?;
1078                    if let Some(current) = current.as_ref()
1079                        && current.expires_at_epoch_ms > now
1080                        && current.owner_id != owner_id
1081                    {
1082                        return Err(process_lease_conflict(&process_id, current));
1083                    }
1084                    // Read the raw fencing token directly: a completed/abandoned
1085                    // lease nulls the owner/token columns but retains the
1086                    // monotonically-increasing `lease_fencing_token`, so a
1087                    // re-claim never reuses a stale writer's token.
1088                    let fencing_token: u64 = tx
1089                        .query_row(
1090                            "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1091                            params![process_id],
1092                            |row| row.get::<_, i64>(0),
1093                        )
1094                        .optional()
1095                        .map_err(process_sqlite_error)?
1096                        .unwrap_or(0) as u64
1097                        + 1;
1098                    let lease = ProcessLease {
1099                        schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1100                        process_id: process_id.clone(),
1101                        owner_id: owner_id.clone(),
1102                        lease_token: format!(
1103                            "{:x}",
1104                            Sha256::digest(
1105                                format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes()
1106                            )
1107                        ),
1108                        fencing_token,
1109                        claimed_at_epoch_ms: now,
1110                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1111                    };
1112                    tx.execute(
1113                        "INSERT INTO process_leases (
1114                            process_id, lease_owner_id, lease_token, lease_fencing_token,
1115                            lease_claimed_at_ms, lease_expires_at_ms
1116                         )
1117                         VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1118                         ON CONFLICT(process_id) DO UPDATE SET
1119                            lease_owner_id = excluded.lease_owner_id,
1120                            lease_token = excluded.lease_token,
1121                            lease_fencing_token = excluded.lease_fencing_token,
1122                            lease_claimed_at_ms = excluded.lease_claimed_at_ms,
1123                            lease_expires_at_ms = excluded.lease_expires_at_ms",
1124                        params![
1125                            lease.process_id.as_str(),
1126                            lease.owner_id.as_str(),
1127                            lease.lease_token.as_str(),
1128                            lease.fencing_token as i64,
1129                            lease.claimed_at_epoch_ms as i64,
1130                            lease.expires_at_epoch_ms as i64,
1131                        ],
1132                    )
1133                    .map_err(process_sqlite_error)?;
1134                    Ok(lease)
1135                })()))
1136            })
1137            .await
1138            .map_err(process_sqlite_error)?
1139    }
1140
1141    async fn renew_process_lease(
1142        &self,
1143        lease: &ProcessLease,
1144        lease_ttl_ms: u64,
1145    ) -> Result<ProcessLease, lash_core::PluginError> {
1146        let lease = lease.clone();
1147        self.conn
1148            .write_flow(move |tx| {
1149                Ok(tx_outcome((|| {
1150                    let now = current_epoch_ms();
1151                    let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
1152                    if !guard_lease(current.as_ref(), &lease.lease_token, now) {
1153                        return Err(process_lease_expired(&lease.process_id));
1154                    }
1155                    let renewed = ProcessLease {
1156                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1157                        ..lease.clone()
1158                    };
1159                    tx.execute(
1160                        "UPDATE process_leases
1161                         SET lease_expires_at_ms = ?2
1162                         WHERE process_id = ?1 AND lease_token = ?3",
1163                        params![
1164                            renewed.process_id.as_str(),
1165                            renewed.expires_at_epoch_ms as i64,
1166                            renewed.lease_token.as_str(),
1167                        ],
1168                    )
1169                    .map_err(process_sqlite_error)?;
1170                    Ok(renewed)
1171                })()))
1172            })
1173            .await
1174            .map_err(process_sqlite_error)?
1175    }
1176
1177    async fn complete_process_lease(
1178        &self,
1179        completion: &ProcessLeaseCompletion,
1180    ) -> Result<(), lash_core::PluginError> {
1181        let process_id = completion.process_id.clone();
1182        let lease_token = completion.lease_token.clone();
1183        self.conn
1184            .call(move |conn| {
1185                conn.execute(
1186                    "UPDATE process_leases
1187                     SET lease_owner_id = NULL,
1188                         lease_token = NULL,
1189                         lease_claimed_at_ms = 0,
1190                         lease_expires_at_ms = 0
1191                     WHERE process_id = ?1 AND lease_token = ?2",
1192                    params![process_id, lease_token],
1193                )
1194            })
1195            .await
1196            .map_err(process_sqlite_error)?;
1197        Ok(())
1198    }
1199}
1200
1201/// Loud, stable error for a fenced process-lease claim on the `PluginError`
1202/// channel the [`ProcessRegistry`] trait returns.
1203fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> lash_core::PluginError {
1204    lash_core::PluginError::Session(format!(
1205        "process `{process_id}` is already leased by `{}` until {}",
1206        current.owner_id, current.expires_at_epoch_ms
1207    ))
1208}
1209
1210/// Loud, stable error for a superseded or expired process lease.
1211fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1212    lash_core::PluginError::Session(format!(
1213        "process lease for `{process_id}` is missing or expired"
1214    ))
1215}