Skip to main content

lash_sqlite_store/
process_registry.rs

1//! SQLite-backed [`ProcessRegistry`] (`SqliteProcessRegistry`).
2//!
3//! First-party SQLite implementation of the public async process-registry
4//! surface. Every DB body is a *synchronous* rusqlite closure handed
5//! to [`SqliteConnection::call`] (reads) or [`SqliteConnection::write_flow`]
6//! (read-then-write).
7//!
8//! ## Why `write_flow`, not `write`
9//!
10//! The registry's transactional methods produce a [`lash_core::PluginError`],
11//! not a `rusqlite::Error`. `SqliteConnection::write` rolls back only when the
12//! closure returns `Err(rusqlite::Error)`, so a logical `PluginError` (e.g. a
13//! registration-hash conflict) would otherwise *commit* the partial work. Each
14//! such method therefore runs its synchronous body returning
15//! `Result<T, PluginError>` and maps it to a [`TxOutcome`]: `Ok` ⇒
16//! `Commit(Ok(value))`, `Err` ⇒ `Rollback(Err(error))`. That preserves the
17//! prior behaviour of rolling back on every error while still carrying the
18//! `PluginError` back to the caller. The outer `rusqlite::Error` channel only
19//! carries genuine SQLite/connection failures, mapped via `process_sqlite_error`.
20//!
21//! The `*_conn` helpers are synchronous and take a `&rusqlite::Connection` so
22//! they compose inside either closure — including from within a `&Transaction`,
23//! which derefs to `&Connection`.
24
25use super::*;
26
27fn process_status_label(record: &ProcessRecord) -> &'static str {
28    record.status.label()
29}
30
31impl SqliteProcessRegistry {
32    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
33        let conn = SqliteConnection::open(path).await?;
34        ensure_process_schema(&conn).await?;
35        apply_pragmas(&conn, StoreBacking::File).await?;
36        Ok(Self {
37            conn,
38            notify: tokio::sync::Notify::new(),
39        })
40    }
41
42    pub async fn memory() -> tokio_rusqlite::Result<Self> {
43        let conn = SqliteConnection::open_in_memory().await?;
44        ensure_process_schema(&conn).await?;
45        apply_pragmas(&conn, StoreBacking::Memory).await?;
46        Ok(Self {
47            conn,
48            notify: tokio::sync::Notify::new(),
49        })
50    }
51
52    fn load_process_conn(
53        conn: &Connection,
54        process_id: &str,
55    ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
56        let json: Option<String> = conn
57            .query_row(
58                "SELECT record_json FROM processes WHERE process_id = ?1",
59                params![process_id],
60                |row| row.get(0),
61            )
62            .optional()
63            .map_err(process_sqlite_error)?;
64        json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
65            .transpose()
66    }
67
68    fn save_process_conn(
69        conn: &Connection,
70        record: &ProcessRecord,
71    ) -> Result<(), lash_core::PluginError> {
72        conn.execute(
73            "UPDATE processes
74             SET updated_at_ms = ?2, status = ?3, record_json = ?4
75             WHERE process_id = ?1",
76            params![
77                record.id.as_str(),
78                record.updated_at_ms as i64,
79                process_status_label(record),
80                process_encode_json(record)?
81            ],
82        )
83        .map_err(process_sqlite_error)?;
84        Ok(())
85    }
86
87    fn load_event_by_key_conn(
88        conn: &Connection,
89        process_id: &str,
90        replay_key: &str,
91    ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
92        let row: Option<(String, String)> = conn
93            .query_row(
94                "SELECT payload_hash, event_json
95                 FROM process_events
96                 WHERE process_id = ?1 AND idempotency_key = ?2",
97                params![process_id, replay_key],
98                |row| Ok((row.get(0)?, row.get(1)?)),
99            )
100            .optional()
101            .map_err(process_sqlite_error)?;
102        row.map(|(hash, json)| {
103            serde_json::from_str(&json)
104                .map(|event| (hash, event))
105                .map_err(process_decode_error)
106        })
107        .transpose()
108    }
109
110    fn load_process_lease_conn(
111        conn: &Connection,
112        process_id: &str,
113    ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
114        conn.query_row(
115            "SELECT lease_owner_id, lease_token, lease_fencing_token,
116                    lease_claimed_at_ms, lease_expires_at_ms
117             FROM process_leases
118             WHERE process_id = ?1",
119            params![process_id],
120            |row| {
121                let owner_id: Option<String> = row.get(0)?;
122                let lease_token: Option<String> = row.get(1)?;
123                let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
124                    return Ok(None);
125                };
126                Ok(Some(ProcessLease {
127                    schema_version: PROCESS_LEASE_SCHEMA_VERSION,
128                    process_id: process_id.to_string(),
129                    owner_id,
130                    lease_token,
131                    fencing_token: row.get::<_, i64>(2)? as u64,
132                    claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
133                    expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
134                }))
135            },
136        )
137        .optional()
138        .map(|lease| lease.flatten())
139        .map_err(process_sqlite_error)
140    }
141
142    fn list_grants_for_scope_conn(
143        conn: &Connection,
144        session_scope: &SessionScope,
145        live_only: bool,
146    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
147        let session_scope_id = session_scope.id();
148        let status_clause = if live_only {
149            "AND p.status = 'running'"
150        } else {
151            ""
152        };
153        let mut stmt = conn
154            .prepare(&format!(
155                "SELECT g.process_id, g.descriptor_json, p.record_json
156                 FROM process_handle_grants g
157                 JOIN processes p ON p.process_id = g.process_id
158                 WHERE g.scope_id = ?1 {status_clause}
159                 ORDER BY g.process_id ASC"
160            ))
161            .map_err(process_sqlite_error)?;
162        let rows = stmt
163            .query_map(params![session_scope_id.as_str()], |row| {
164                Ok((
165                    row.get::<_, String>(0)?,
166                    row.get::<_, String>(1)?,
167                    row.get::<_, String>(2)?,
168                ))
169            })
170            .map_err(process_sqlite_error)?;
171        let mut entries = Vec::new();
172        for row in rows {
173            let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
174            let descriptor: ProcessHandleDescriptor =
175                serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
176            let record: ProcessRecord =
177                serde_json::from_str(&record_json).map_err(process_decode_error)?;
178            entries.push((
179                ProcessHandleGrant {
180                    session_id: session_scope.session_id.clone(),
181                    process_id,
182                    descriptor,
183                },
184                record,
185            ));
186        }
187        Ok(entries)
188    }
189}
190
191/// Map a `Result<T, PluginError>` produced by a synchronous transaction body to
192/// a [`TxOutcome`]: commit on success, roll back on logical error. Both arms
193/// carry the inner `Result` back so the caller recovers the value or the
194/// `PluginError` after the transaction resolves.
195fn tx_outcome<T>(
196    result: Result<T, lash_core::PluginError>,
197) -> TxOutcome<Result<T, lash_core::PluginError>> {
198    match result {
199        Ok(value) => TxOutcome::Commit(Ok(value)),
200        Err(err) => TxOutcome::Rollback(Err(err)),
201    }
202}
203
204#[async_trait::async_trait]
205impl ProcessRegistry for SqliteProcessRegistry {
206    fn durability_tier(&self) -> DurabilityTier {
207        DurabilityTier::Durable
208    }
209
210    async fn register_process(
211        &self,
212        registration: ProcessRegistration,
213    ) -> Result<ProcessRecord, lash_core::PluginError> {
214        let (registration, registration_hash) = prepare_process_registration(registration)?;
215        let record = self
216            .conn
217            .write_flow(move |tx| {
218                Ok(tx_outcome((|| {
219                    if let Some(existing) = Self::load_process_conn(tx, &registration.id)? {
220                        if existing.registration_hash == registration_hash {
221                            return Ok(existing);
222                        }
223                        return Err(lash_core::PluginError::Session(format!(
224                            "process `{}` registration hash conflict: existing {}, new {}",
225                            registration.id, existing.registration_hash, registration_hash
226                        )));
227                    }
228                    let now = current_epoch_ms();
229                    let record = ProcessRecord::from_prepared_registration(
230                        registration,
231                        registration_hash,
232                        now,
233                    );
234                    let originator_scope_id = record.originator_scope_id();
235                    tx.execute(
236                        "INSERT INTO processes (
237                            process_id, registration_hash, owner_scope_id, host_profile_id,
238                            created_at_ms, updated_at_ms, status, record_json
239                         )
240                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
241                        params![
242                            record.id.as_str(),
243                            record.registration_hash.as_str(),
244                            originator_scope_id.as_str(),
245                            record.host_profile_id(),
246                            record.created_at_ms as i64,
247                            record.updated_at_ms as i64,
248                            process_status_label(&record),
249                            process_encode_json(&record)?,
250                        ],
251                    )
252                    .map_err(process_sqlite_error)?;
253                    Ok(record)
254                })()))
255            })
256            .await
257            .map_err(process_sqlite_error)??;
258        self.notify.notify_waiters();
259        Ok(record)
260    }
261
262    async fn set_external_ref(
263        &self,
264        process_id: &str,
265        external_ref: ProcessExternalRef,
266    ) -> Result<ProcessRecord, lash_core::PluginError> {
267        let process_id = process_id.to_string();
268        let record = self
269            .conn
270            .write_flow(move |tx| {
271                Ok(tx_outcome((|| {
272                    let mut record =
273                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
274                            lash_core::PluginError::Session(format!(
275                                "unknown process `{process_id}`"
276                            ))
277                        })?;
278                    record.external_ref = Some(external_ref);
279                    record.updated_at_ms = current_epoch_ms();
280                    Self::save_process_conn(tx, &record)?;
281                    Ok(record)
282                })()))
283            })
284            .await
285            .map_err(process_sqlite_error)??;
286        self.notify.notify_waiters();
287        Ok(record)
288    }
289
290    async fn grant_handle(
291        &self,
292        session_scope: &SessionScope,
293        process_id: &str,
294        descriptor: ProcessHandleDescriptor,
295    ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
296        let session_scope = session_scope.clone();
297        let process_id = process_id.to_string();
298        self.conn
299            .write_flow(move |tx| {
300                Ok(tx_outcome((|| {
301                    let session_scope_id = session_scope.id();
302                    if Self::load_process_conn(tx, &process_id)?.is_none() {
303                        return Err(lash_core::PluginError::Session(format!(
304                            "unknown process `{process_id}`"
305                        )));
306                    }
307                    tx.execute(
308                        "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
309                         VALUES (?1, ?2, ?3, ?4)
310                         ON CONFLICT(scope_id, process_id) DO UPDATE SET
311                            session_id = excluded.session_id,
312                            descriptor_json = excluded.descriptor_json",
313                        params![
314                            session_scope.session_id.as_str(),
315                            session_scope_id.as_str(),
316                            process_id.as_str(),
317                            process_encode_json(&descriptor)?
318                        ],
319                    )
320                    .map_err(process_sqlite_error)?;
321                    Ok(ProcessHandleGrant {
322                        session_id: session_scope.session_id.clone(),
323                        process_id: process_id.clone(),
324                        descriptor,
325                    })
326                })()))
327            })
328            .await
329            .map_err(process_sqlite_error)?
330    }
331
332    async fn revoke_handle(
333        &self,
334        session_scope: &SessionScope,
335        process_id: &str,
336    ) -> Result<(), lash_core::PluginError> {
337        let session_scope_id = session_scope.id().as_str().to_string();
338        let process_id = process_id.to_string();
339        self.conn
340            .call(move |conn| {
341                conn.execute(
342                    "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
343                    params![session_scope_id, process_id],
344                )
345            })
346            .await
347            .map_err(process_sqlite_error)?;
348        Ok(())
349    }
350
351    async fn transfer_handle_grants(
352        &self,
353        from_scope: &SessionScope,
354        to_scope: &SessionScope,
355        process_ids: &[String],
356    ) -> Result<(), lash_core::PluginError> {
357        let from_scope = from_scope.clone();
358        let to_scope = to_scope.clone();
359        let process_ids = process_ids.to_vec();
360        self.conn
361            .write_flow(move |tx| {
362                Ok(tx_outcome((|| {
363                    let from_scope_id = from_scope.id();
364                    let to_scope_id = to_scope.id();
365                    for process_id in &process_ids {
366                        let descriptor_json: Option<String> = tx
367                            .query_row(
368                                "SELECT descriptor_json
369                                 FROM process_handle_grants
370                                 WHERE scope_id = ?1 AND process_id = ?2",
371                                params![from_scope_id.as_str(), process_id.as_str()],
372                                |row| row.get(0),
373                            )
374                            .optional()
375                            .map_err(process_sqlite_error)?;
376                        let Some(descriptor_json) = descriptor_json else {
377                            return Err(lash_core::PluginError::Session(format!(
378                                "process handle `{process_id}` is not granted to session `{}`",
379                                from_scope.session_id
380                            )));
381                        };
382                        tx.execute(
383                            "DELETE FROM process_handle_grants
384                             WHERE scope_id = ?1 AND process_id = ?2",
385                            params![from_scope_id.as_str(), process_id.as_str()],
386                        )
387                        .map_err(process_sqlite_error)?;
388                        tx.execute(
389                            "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
390                             VALUES (?1, ?2, ?3, ?4)
391                             ON CONFLICT(scope_id, process_id) DO UPDATE SET
392                                session_id = excluded.session_id,
393                                descriptor_json = excluded.descriptor_json",
394                            params![
395                                to_scope.session_id.as_str(),
396                                to_scope_id.as_str(),
397                                process_id.as_str(),
398                                descriptor_json
399                            ],
400                        )
401                        .map_err(process_sqlite_error)?;
402                    }
403                    Ok(())
404                })()))
405            })
406            .await
407            .map_err(process_sqlite_error)?
408    }
409
410    async fn list_handle_grants(
411        &self,
412        session_scope: &SessionScope,
413    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
414        let session_scope = session_scope.clone();
415        self.conn
416            .call(move |conn| {
417                Ok(Self::list_grants_for_scope_conn(
418                    conn,
419                    &session_scope,
420                    false,
421                ))
422            })
423            .await
424            .map_err(process_sqlite_error)?
425    }
426
427    async fn list_live_handle_grants(
428        &self,
429        session_scope: &SessionScope,
430    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
431        let session_scope = session_scope.clone();
432        self.conn
433            .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &session_scope, true)))
434            .await
435            .map_err(process_sqlite_error)?
436    }
437
438    async fn has_handle_grant(
439        &self,
440        session_scope: &SessionScope,
441        process_id: &str,
442    ) -> Result<bool, lash_core::PluginError> {
443        let session_scope_id = session_scope.id().as_str().to_string();
444        let process_id = process_id.to_string();
445        self.conn
446            .call(move |conn| {
447                let exists = conn
448                    .query_row(
449                        "SELECT 1
450                         FROM process_handle_grants g
451                         JOIN processes p ON p.process_id = g.process_id
452                         WHERE g.scope_id = ?1 AND g.process_id = ?2
453                         LIMIT 1",
454                        params![session_scope_id, process_id],
455                        |_| Ok(()),
456                    )
457                    .optional()?
458                    .is_some();
459                Ok(exists)
460            })
461            .await
462            .map_err(process_sqlite_error)
463    }
464
465    async fn handle_grants_for_process(
466        &self,
467        process_id: &str,
468    ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
469        let process_id = process_id.to_string();
470        self.conn
471            .call(move |conn| {
472                Ok((|| {
473                    if Self::load_process_conn(conn, &process_id)?.is_none() {
474                        return Err(lash_core::PluginError::Session(format!(
475                            "unknown process `{process_id}`"
476                        )));
477                    }
478                    let mut stmt = conn
479                        .prepare(
480                            "SELECT session_id, descriptor_json
481                             FROM process_handle_grants
482                             WHERE process_id = ?1
483                             ORDER BY session_id ASC, scope_id ASC",
484                        )
485                        .map_err(process_sqlite_error)?;
486                    let rows = stmt
487                        .query_map(params![process_id], |row| {
488                            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
489                        })
490                        .map_err(process_sqlite_error)?;
491                    let mut grants = Vec::new();
492                    for row in rows {
493                        let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
494                        let descriptor: ProcessHandleDescriptor =
495                            serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
496                        grants.push(ProcessHandleGrant {
497                            session_id,
498                            process_id: process_id.clone(),
499                            descriptor,
500                        });
501                    }
502                    Ok(grants)
503                })())
504            })
505            .await
506            .map_err(process_sqlite_error)?
507    }
508
509    async fn delete_session_process_state(
510        &self,
511        session_id: &str,
512    ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
513        let session_id_owned = session_id.to_string();
514        let (
515            revoked_handle_count,
516            deleted_wake_count,
517            mut orphaned_process_ids,
518            mut preserved_process_ids,
519        ) = self
520            .conn
521            .write_flow(move |tx| {
522                Ok(tx_outcome((|| {
523                    let session_id = session_id_owned;
524                    let removed = {
525                        let mut stmt = tx
526                            .prepare(
527                                "SELECT g.process_id, p.record_json
528                                 FROM process_handle_grants g
529                                 JOIN processes p ON p.process_id = g.process_id
530                                 WHERE g.session_id = ?1
531                                 ORDER BY g.process_id ASC",
532                            )
533                            .map_err(process_sqlite_error)?;
534                        let rows = stmt
535                            .query_map(params![session_id], |row| {
536                                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
537                            })
538                            .map_err(process_sqlite_error)?;
539                        let mut removed = Vec::new();
540                        for row in rows {
541                            let (process_id, record_json) = row.map_err(process_sqlite_error)?;
542                            let record: ProcessRecord =
543                                serde_json::from_str(&record_json).map_err(process_decode_error)?;
544                            removed.push((process_id, record));
545                        }
546                        removed
547                    };
548
549                    // Wake acknowledgements are process-scoped consumed-event markers.
550                    // Session deletion removes materialized session-addressed deliveries
551                    // through the session store; clearing these rows would re-expose
552                    // already-consumed wakes to surviving grants or future host readers.
553                    let deleted_wake_count = 0;
554                    let revoked_handle_count = tx
555                        .execute(
556                            "DELETE FROM process_handle_grants WHERE session_id = ?1",
557                            params![session_id],
558                        )
559                        .map_err(process_sqlite_error)?;
560                    let mut orphaned_process_ids = Vec::new();
561                    let mut preserved_process_ids = Vec::new();
562                    for (process_id, record) in removed {
563                        if record.is_terminal() {
564                            continue;
565                        }
566                        let remaining_grants: i64 = tx
567                            .query_row(
568                                "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
569                                params![process_id],
570                                |row| row.get(0),
571                            )
572                            .map_err(process_sqlite_error)?;
573                        if remaining_grants == 0 {
574                            orphaned_process_ids.push(process_id);
575                        } else {
576                            preserved_process_ids.push(process_id);
577                        }
578                    }
579                    Ok((
580                        revoked_handle_count,
581                        deleted_wake_count,
582                        orphaned_process_ids,
583                        preserved_process_ids,
584                    ))
585                })()))
586            })
587            .await
588            .map_err(process_sqlite_error)??;
589        orphaned_process_ids.sort();
590        orphaned_process_ids.dedup();
591        preserved_process_ids.sort();
592        preserved_process_ids.dedup();
593        Ok(lash_core::ProcessSessionDeleteReport {
594            session_id: session_id.to_string(),
595            revoked_handle_count,
596            deleted_wake_count,
597            orphaned_process_ids,
598            preserved_process_ids,
599        })
600    }
601
602    async fn append_event(
603        &self,
604        process_id: &str,
605        request: ProcessEventAppendRequest,
606    ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
607        let process_id = process_id.to_string();
608        let (result, appended) = self
609            .conn
610            .write_flow(move |tx| {
611                Ok(tx_outcome((|| {
612                    let mut record =
613                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
614                            lash_core::PluginError::Session(format!(
615                                "unknown process `{process_id}`"
616                            ))
617                        })?;
618                    let replay_lookup = if let Some(replay_key) =
619                        request.replay.as_ref().map(|replay| replay.key.as_str())
620                    {
621                        Self::load_event_by_key_conn(tx, &process_id, replay_key)?
622                    } else {
623                        None
624                    };
625                    let sequence = tx
626                        .query_row(
627                            "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
628                            params![process_id],
629                            |row| row.get::<_, i64>(0),
630                        )
631                        .map_err(process_sqlite_error)? as u64;
632                    let occurred_at_ms = current_epoch_ms();
633                    let prepared = prepare_process_event_append(
634                        &record,
635                        request,
636                        sequence,
637                        replay_lookup,
638                        occurred_at_ms,
639                    )?;
640                    if prepared.replayed {
641                        return Ok((
642                            ProcessEventAppendResult {
643                                event: prepared.event,
644                                wake_delivery: prepared.wake_delivery,
645                            },
646                            false,
647                        ));
648                    }
649                    let event = prepared.event;
650                    tx.execute(
651                        "INSERT INTO process_events (
652                            process_id, sequence, event_type, payload_hash, idempotency_key,
653                            occurred_at_ms, event_json
654                         )
655                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
656                        params![
657                            process_id,
658                            sequence as i64,
659                            event.event_type.as_str(),
660                            prepared.payload_hash.as_str(),
661                            event.invocation.replay_key(),
662                            prepared.occurred_at_ms as i64,
663                            process_encode_json(&event)?,
664                        ],
665                    )
666                    .map_err(process_sqlite_error)?;
667                    if let Some(status) = prepared.status_update.clone() {
668                        record.status = status;
669                        if record.status.is_terminal() {
670                            record.wait = None;
671                        }
672                    }
673                    record.updated_at_ms = prepared.occurred_at_ms;
674                    Self::save_process_conn(tx, &record)?;
675                    Ok((
676                        ProcessEventAppendResult {
677                            event,
678                            wake_delivery: prepared.wake_delivery,
679                        },
680                        true,
681                    ))
682                })()))
683            })
684            .await
685            .map_err(process_sqlite_error)??;
686        if appended {
687            self.notify.notify_waiters();
688        }
689        Ok(result)
690    }
691
692    async fn events_after(
693        &self,
694        process_id: &str,
695        after_sequence: u64,
696    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
697        let process_id = process_id.to_string();
698        self.conn
699            .call(move |conn| {
700                Ok((|| {
701                    if Self::load_process_conn(conn, &process_id)?.is_none() {
702                        return Err(lash_core::PluginError::Session(format!(
703                            "unknown process `{process_id}`"
704                        )));
705                    }
706                    let mut stmt = conn
707                        .prepare(
708                            "SELECT event_json FROM process_events
709                             WHERE process_id = ?1 AND sequence > ?2
710                             ORDER BY sequence ASC",
711                        )
712                        .map_err(process_sqlite_error)?;
713                    let rows = stmt
714                        .query_map(params![process_id, after_sequence as i64], |row| {
715                            row.get::<_, String>(0)
716                        })
717                        .map_err(process_sqlite_error)?;
718                    let mut events = Vec::new();
719                    for row in rows {
720                        events.push(
721                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
722                                .map_err(process_decode_error)?,
723                        );
724                    }
725                    Ok(events)
726                })())
727            })
728            .await
729            .map_err(process_sqlite_error)?
730    }
731
732    async fn count_events_through(
733        &self,
734        process_id: &str,
735        event_type: &str,
736        up_to_sequence: u64,
737    ) -> Result<u64, lash_core::PluginError> {
738        let process_id = process_id.to_string();
739        let event_type = event_type.to_string();
740        self.conn
741            .call(move |conn| {
742                Ok((|| {
743                    if Self::load_process_conn(conn, &process_id)?.is_none() {
744                        return Err(lash_core::PluginError::Session(format!(
745                            "unknown process `{process_id}`"
746                        )));
747                    }
748                    conn.query_row(
749                        "SELECT COUNT(*) FROM process_events
750                         WHERE process_id = ?1 AND event_type = ?2 AND sequence <= ?3",
751                        params![process_id, event_type, up_to_sequence as i64],
752                        |row| row.get::<_, i64>(0),
753                    )
754                    .map(|count| count as u64)
755                    .map_err(process_sqlite_error)
756                })())
757            })
758            .await
759            .map_err(process_sqlite_error)?
760    }
761
762    async fn recent_events(
763        &self,
764        process_id: &str,
765        limit: usize,
766    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
767        let process_id = process_id.to_string();
768        self.conn
769            .call(move |conn| {
770                Ok((|| {
771                    if Self::load_process_conn(conn, &process_id)?.is_none() {
772                        return Err(lash_core::PluginError::Session(format!(
773                            "unknown process `{process_id}`"
774                        )));
775                    }
776                    let mut stmt = conn
777                        .prepare(
778                            "SELECT event_json FROM process_events
779                             WHERE process_id = ?1
780                             ORDER BY sequence DESC
781                             LIMIT ?2",
782                        )
783                        .map_err(process_sqlite_error)?;
784                    let rows = stmt
785                        .query_map(params![process_id, limit as i64], |row| {
786                            row.get::<_, String>(0)
787                        })
788                        .map_err(process_sqlite_error)?;
789                    let mut events: Vec<ProcessEvent> = Vec::new();
790                    for row in rows {
791                        events.push(
792                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
793                                .map_err(process_decode_error)?,
794                        );
795                    }
796                    events.reverse();
797                    Ok(events)
798                })())
799            })
800            .await
801            .map_err(process_sqlite_error)?
802    }
803
804    async fn wake_events_after(
805        &self,
806        process_id: &str,
807        after_sequence: u64,
808    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
809        let acked: std::collections::HashSet<u64> = {
810            let process_id = process_id.to_string();
811            self.conn
812                .call(move |conn| {
813                    Ok(
814                        (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
815                            let mut stmt = conn
816                                .prepare(
817                                    "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
818                                )
819                                .map_err(process_sqlite_error)?;
820                            let rows = stmt
821                                .query_map(params![process_id], |row| row.get::<_, i64>(0))
822                                .map_err(process_sqlite_error)?;
823                            let mut set = std::collections::HashSet::new();
824                            for row in rows {
825                                set.insert(row.map_err(process_sqlite_error)? as u64);
826                            }
827                            Ok(set)
828                        })(),
829                    )
830                })
831                .await
832                .map_err(process_sqlite_error)??
833        };
834        Ok(self
835            .events_after(process_id, after_sequence)
836            .await?
837            .into_iter()
838            .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
839            .collect())
840    }
841
842    async fn wait_event_after(
843        &self,
844        process_id: &str,
845        event_type: &str,
846        after_sequence: u64,
847    ) -> Result<ProcessEvent, lash_core::PluginError> {
848        loop {
849            if let Some(event) = self
850                .events_after(process_id, after_sequence)
851                .await?
852                .into_iter()
853                .find(|event| event.event_type == event_type)
854            {
855                return Ok(event);
856            }
857            tokio::select! {
858                _ = self.notify.notified() => {}
859                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
860            }
861        }
862    }
863
864    async fn await_process(
865        &self,
866        process_id: &str,
867    ) -> Result<ProcessAwaitOutput, lash_core::PluginError> {
868        loop {
869            let record = self.get_process(process_id).await.ok_or_else(|| {
870                lash_core::PluginError::Session(format!("unknown process `{process_id}`"))
871            })?;
872            if let Some(await_output) = record.status.await_output() {
873                return Ok(await_output.clone());
874            }
875            tokio::select! {
876                _ = self.notify.notified() => {}
877                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
878            }
879        }
880    }
881
882    async fn complete_process(
883        &self,
884        process_id: &str,
885        await_output: ProcessAwaitOutput,
886    ) -> Result<ProcessRecord, lash_core::PluginError> {
887        let event_type = match await_output.terminal_state() {
888            lash_core::ProcessTerminalState::Completed => "process.completed",
889            lash_core::ProcessTerminalState::Failed => "process.failed",
890            lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
891        };
892        self.append_event(
893            process_id,
894            ProcessEventAppendRequest::new(
895                event_type,
896                serde_json::json!({ "await_output": await_output }),
897            )
898            .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
899        )
900        .await?;
901        self.get_process(process_id).await.ok_or_else(|| {
902            lash_core::PluginError::Session(format!(
903                "unknown process `{process_id}` after terminal event"
904            ))
905        })
906    }
907
908    async fn set_process_wait(
909        &self,
910        process_id: &str,
911        wait: lash_core::WaitState,
912    ) -> Result<ProcessRecord, lash_core::PluginError> {
913        let process_id = process_id.to_string();
914        self.conn
915            .write_flow(move |tx| {
916                Ok(tx_outcome((|| {
917                    let mut record =
918                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
919                            lash_core::PluginError::Session(format!(
920                                "unknown process `{process_id}`"
921                            ))
922                        })?;
923                    if record.is_terminal() {
924                        return Err(lash_core::PluginError::Session(format!(
925                            "terminal process `{process_id}` cannot enter a wait state"
926                        )));
927                    }
928                    record.wait = Some(wait);
929                    record.updated_at_ms = current_epoch_ms();
930                    Self::save_process_conn(tx, &record)?;
931                    Ok(record)
932                })()))
933            })
934            .await
935            .map_err(process_sqlite_error)?
936    }
937
938    async fn clear_process_wait(
939        &self,
940        process_id: &str,
941    ) -> Result<ProcessRecord, lash_core::PluginError> {
942        let process_id = process_id.to_string();
943        self.conn
944            .write_flow(move |tx| {
945                Ok(tx_outcome((|| {
946                    let mut record =
947                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
948                            lash_core::PluginError::Session(format!(
949                                "unknown process `{process_id}`"
950                            ))
951                        })?;
952                    record.wait = None;
953                    record.updated_at_ms = current_epoch_ms();
954                    Self::save_process_conn(tx, &record)?;
955                    Ok(record)
956                })()))
957            })
958            .await
959            .map_err(process_sqlite_error)?
960    }
961
962    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
963        let process_id = process_id.to_string();
964        self.conn
965            .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
966            .await
967            .ok()
968            .flatten()
969    }
970
971    async fn list_processes(
972        &self,
973        filter: &lash_core::ProcessListFilter,
974    ) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
975        let filter = filter.clone();
976        self.conn
977            .call(move |conn| {
978                Ok((|| {
979                    let mut stmt = conn
980                        .prepare(
981                            "SELECT record_json FROM processes
982                             ORDER BY process_id ASC",
983                        )
984                        .map_err(process_sqlite_error)?;
985                    let rows = stmt
986                        .query_map([], |row| row.get::<_, String>(0))
987                        .map_err(process_sqlite_error)?;
988                    let mut records = Vec::new();
989                    for row in rows {
990                        let record: ProcessRecord =
991                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
992                                .map_err(process_decode_error)?;
993                        if filter.matches_record(&record) {
994                            records.push(record);
995                        }
996                    }
997                    Ok(records)
998                })())
999            })
1000            .await
1001            .map_err(process_sqlite_error)?
1002    }
1003
1004    async fn ack_wake(
1005        &self,
1006        process_id: &str,
1007        sequence: u64,
1008    ) -> Result<(), lash_core::PluginError> {
1009        let process_id = process_id.to_string();
1010        self.conn
1011            .call(move |conn| {
1012                Ok((|| {
1013                    if Self::load_process_conn(conn, &process_id)?.is_none() {
1014                        return Err(lash_core::PluginError::Session(format!(
1015                            "unknown process `{process_id}`"
1016                        )));
1017                    }
1018                    conn.execute(
1019                        "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
1020                        params![process_id, sequence as i64],
1021                    )
1022                    .map_err(process_sqlite_error)?;
1023                    Ok(())
1024                })())
1025            })
1026            .await
1027            .map_err(process_sqlite_error)?
1028    }
1029
1030    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1031        self.conn
1032            .call(move |conn| {
1033                Ok((|| {
1034                    let mut stmt = conn
1035                        .prepare(
1036                            "SELECT record_json FROM processes
1037                             WHERE status = 'running'
1038                             ORDER BY process_id ASC",
1039                        )
1040                        .map_err(process_sqlite_error)?;
1041                    let rows = stmt
1042                        .query_map([], |row| row.get::<_, String>(0))
1043                        .map_err(process_sqlite_error)?;
1044                    let mut records = Vec::new();
1045                    for row in rows {
1046                        let record: ProcessRecord =
1047                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
1048                                .map_err(process_decode_error)?;
1049                        records.push(record);
1050                    }
1051                    Ok(records)
1052                })())
1053            })
1054            .await
1055            .map_err(process_sqlite_error)?
1056    }
1057
1058    async fn claim_process_lease(
1059        &self,
1060        process_id: &str,
1061        owner_id: &str,
1062        lease_ttl_ms: u64,
1063    ) -> Result<ProcessLease, lash_core::PluginError> {
1064        let process_id = process_id.to_string();
1065        let owner_id = owner_id.to_string();
1066        self.conn
1067            .write_flow(move |tx| {
1068                Ok(tx_outcome((|| {
1069                    if Self::load_process_conn(tx, &process_id)?.is_none() {
1070                        return Err(lash_core::PluginError::Session(format!(
1071                            "unknown process `{process_id}`"
1072                        )));
1073                    }
1074                    let now = current_epoch_ms();
1075                    let current = Self::load_process_lease_conn(tx, &process_id)?;
1076                    if let Some(current) = current.as_ref()
1077                        && current.expires_at_epoch_ms > now
1078                        && current.owner_id != owner_id
1079                    {
1080                        return Err(process_lease_conflict(&process_id, current));
1081                    }
1082                    // Read the raw fencing token directly: a completed/abandoned
1083                    // lease nulls the owner/token columns but retains the
1084                    // monotonically-increasing `lease_fencing_token`, so a
1085                    // re-claim never reuses a stale writer's token.
1086                    let fencing_token: u64 = tx
1087                        .query_row(
1088                            "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1089                            params![process_id],
1090                            |row| row.get::<_, i64>(0),
1091                        )
1092                        .optional()
1093                        .map_err(process_sqlite_error)?
1094                        .unwrap_or(0) as u64
1095                        + 1;
1096                    let lease = ProcessLease {
1097                        schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1098                        process_id: process_id.clone(),
1099                        owner_id: owner_id.clone(),
1100                        lease_token: format!(
1101                            "{:x}",
1102                            Sha256::digest(
1103                                format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes()
1104                            )
1105                        ),
1106                        fencing_token,
1107                        claimed_at_epoch_ms: now,
1108                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1109                    };
1110                    tx.execute(
1111                        "INSERT INTO process_leases (
1112                            process_id, lease_owner_id, lease_token, lease_fencing_token,
1113                            lease_claimed_at_ms, lease_expires_at_ms
1114                         )
1115                         VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1116                         ON CONFLICT(process_id) DO UPDATE SET
1117                            lease_owner_id = excluded.lease_owner_id,
1118                            lease_token = excluded.lease_token,
1119                            lease_fencing_token = excluded.lease_fencing_token,
1120                            lease_claimed_at_ms = excluded.lease_claimed_at_ms,
1121                            lease_expires_at_ms = excluded.lease_expires_at_ms",
1122                        params![
1123                            lease.process_id.as_str(),
1124                            lease.owner_id.as_str(),
1125                            lease.lease_token.as_str(),
1126                            lease.fencing_token as i64,
1127                            lease.claimed_at_epoch_ms as i64,
1128                            lease.expires_at_epoch_ms as i64,
1129                        ],
1130                    )
1131                    .map_err(process_sqlite_error)?;
1132                    Ok(lease)
1133                })()))
1134            })
1135            .await
1136            .map_err(process_sqlite_error)?
1137    }
1138
1139    async fn renew_process_lease(
1140        &self,
1141        lease: &ProcessLease,
1142        lease_ttl_ms: u64,
1143    ) -> Result<ProcessLease, lash_core::PluginError> {
1144        let lease = lease.clone();
1145        self.conn
1146            .write_flow(move |tx| {
1147                Ok(tx_outcome((|| {
1148                    let now = current_epoch_ms();
1149                    let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
1150                    if !guard_lease(current.as_ref(), &lease.lease_token, now) {
1151                        return Err(process_lease_expired(&lease.process_id));
1152                    }
1153                    let renewed = ProcessLease {
1154                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1155                        ..lease.clone()
1156                    };
1157                    tx.execute(
1158                        "UPDATE process_leases
1159                         SET lease_expires_at_ms = ?2
1160                         WHERE process_id = ?1 AND lease_token = ?3",
1161                        params![
1162                            renewed.process_id.as_str(),
1163                            renewed.expires_at_epoch_ms as i64,
1164                            renewed.lease_token.as_str(),
1165                        ],
1166                    )
1167                    .map_err(process_sqlite_error)?;
1168                    Ok(renewed)
1169                })()))
1170            })
1171            .await
1172            .map_err(process_sqlite_error)?
1173    }
1174
1175    async fn complete_process_lease(
1176        &self,
1177        completion: &ProcessLeaseCompletion,
1178    ) -> Result<(), lash_core::PluginError> {
1179        let process_id = completion.process_id.clone();
1180        let lease_token = completion.lease_token.clone();
1181        self.conn
1182            .call(move |conn| {
1183                conn.execute(
1184                    "UPDATE process_leases
1185                     SET lease_owner_id = NULL,
1186                         lease_token = NULL,
1187                         lease_claimed_at_ms = 0,
1188                         lease_expires_at_ms = 0
1189                     WHERE process_id = ?1 AND lease_token = ?2",
1190                    params![process_id, lease_token],
1191                )
1192            })
1193            .await
1194            .map_err(process_sqlite_error)?;
1195        Ok(())
1196    }
1197}
1198
1199/// Loud, stable error for a fenced process-lease claim on the `PluginError`
1200/// channel the [`ProcessRegistry`] trait returns.
1201fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> lash_core::PluginError {
1202    lash_core::PluginError::Session(format!(
1203        "process `{process_id}` is already leased by `{}` until {}",
1204        current.owner_id, current.expires_at_epoch_ms
1205    ))
1206}
1207
1208/// Loud, stable error for a superseded or expired process lease.
1209fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1210    lash_core::PluginError::Session(format!(
1211        "process lease for `{process_id}` is missing or expired"
1212    ))
1213}