Skip to main content

lash_sqlite_store/
process_registry.rs

1//! SQLite-backed [`ProcessRegistry`] (`SqliteProcessRegistry`).
2//!
3//! First-party SQLite implementation of the public async process-registry
4//! surface. Every DB body is a *synchronous* rusqlite closure handed
5//! to [`SqliteConnection::call`] (reads) or [`SqliteConnection::write_flow`]
6//! (read-then-write).
7//!
8//! ## Why `write_flow`, not `write`
9//!
10//! The registry's transactional methods produce a [`lash_core::PluginError`],
11//! not a `rusqlite::Error`. `SqliteConnection::write` rolls back only when the
12//! closure returns `Err(rusqlite::Error)`, so a logical `PluginError` (e.g. a
13//! registration-hash conflict) would otherwise *commit* the partial work. Each
14//! such method therefore runs its synchronous body returning
15//! `Result<T, PluginError>` and maps it to a [`TxOutcome`]: `Ok` ⇒
16//! `Commit(Ok(value))`, `Err` ⇒ `Rollback(Err(error))`. That preserves the
17//! prior behaviour of rolling back on every error while still carrying the
18//! `PluginError` back to the caller. The outer `rusqlite::Error` channel only
19//! carries genuine SQLite/connection failures, mapped via `process_sqlite_error`.
20//!
21//! The `*_conn` helpers are synchronous and take a `&rusqlite::Connection` so
22//! they compose inside either closure — including from within a `&Transaction`,
23//! which derefs to `&Connection`.
24
25use super::*;
26
27fn process_status_label(record: &ProcessRecord) -> &'static str {
28    record.status.label()
29}
30
31impl SqliteProcessRegistry {
32    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
33        let conn = SqliteConnection::open(path).await?;
34        ensure_process_schema(&conn).await?;
35        apply_pragmas(&conn, StoreBacking::File).await?;
36        Ok(Self {
37            conn,
38            notify: tokio::sync::Notify::new(),
39        })
40    }
41
42    pub async fn memory() -> tokio_rusqlite::Result<Self> {
43        let conn = SqliteConnection::open_in_memory().await?;
44        ensure_process_schema(&conn).await?;
45        apply_pragmas(&conn, StoreBacking::Memory).await?;
46        Ok(Self {
47            conn,
48            notify: tokio::sync::Notify::new(),
49        })
50    }
51
52    fn load_process_conn(
53        conn: &Connection,
54        process_id: &str,
55    ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
56        let json: Option<String> = conn
57            .query_row(
58                "SELECT record_json FROM processes WHERE process_id = ?1",
59                params![process_id],
60                |row| row.get(0),
61            )
62            .optional()
63            .map_err(process_sqlite_error)?;
64        json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
65            .transpose()
66    }
67
68    fn save_process_conn(
69        conn: &Connection,
70        record: &ProcessRecord,
71    ) -> Result<(), lash_core::PluginError> {
72        conn.execute(
73            "UPDATE processes
74             SET updated_at_ms = ?2, status = ?3, record_json = ?4
75             WHERE process_id = ?1",
76            params![
77                record.id.as_str(),
78                record.updated_at_ms as i64,
79                process_status_label(record),
80                process_encode_json(record)?
81            ],
82        )
83        .map_err(process_sqlite_error)?;
84        Ok(())
85    }
86
87    fn load_event_by_key_conn(
88        conn: &Connection,
89        process_id: &str,
90        replay_key: &str,
91    ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
92        let row: Option<(String, String)> = conn
93            .query_row(
94                "SELECT payload_hash, event_json
95                 FROM process_events
96                 WHERE process_id = ?1 AND idempotency_key = ?2",
97                params![process_id, replay_key],
98                |row| Ok((row.get(0)?, row.get(1)?)),
99            )
100            .optional()
101            .map_err(process_sqlite_error)?;
102        row.map(|(hash, json)| {
103            serde_json::from_str(&json)
104                .map(|event| (hash, event))
105                .map_err(process_decode_error)
106        })
107        .transpose()
108    }
109
110    fn load_process_lease_conn(
111        conn: &Connection,
112        process_id: &str,
113    ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
114        conn.query_row(
115            "SELECT lease_owner_id, lease_token, lease_fencing_token,
116                    lease_claimed_at_ms, lease_expires_at_ms
117             FROM process_leases
118             WHERE process_id = ?1",
119            params![process_id],
120            |row| {
121                let owner_id: Option<String> = row.get(0)?;
122                let lease_token: Option<String> = row.get(1)?;
123                let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
124                    return Ok(None);
125                };
126                Ok(Some(ProcessLease {
127                    schema_version: PROCESS_LEASE_SCHEMA_VERSION,
128                    process_id: process_id.to_string(),
129                    owner_id,
130                    lease_token,
131                    fencing_token: row.get::<_, i64>(2)? as u64,
132                    claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
133                    expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
134                }))
135            },
136        )
137        .optional()
138        .map(|lease| lease.flatten())
139        .map_err(process_sqlite_error)
140    }
141
142    fn list_grants_for_scope_conn(
143        conn: &Connection,
144        session_scope: &SessionScope,
145        live_only: bool,
146    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
147        let session_scope_id = session_scope.id();
148        let status_clause = if live_only {
149            "AND p.status = 'running'"
150        } else {
151            ""
152        };
153        let mut stmt = conn
154            .prepare(&format!(
155                "SELECT g.process_id, g.descriptor_json, p.record_json
156                 FROM process_handle_grants g
157                 JOIN processes p ON p.process_id = g.process_id
158                 WHERE g.scope_id = ?1 {status_clause}
159                 ORDER BY g.process_id ASC"
160            ))
161            .map_err(process_sqlite_error)?;
162        let rows = stmt
163            .query_map(params![session_scope_id.as_str()], |row| {
164                Ok((
165                    row.get::<_, String>(0)?,
166                    row.get::<_, String>(1)?,
167                    row.get::<_, String>(2)?,
168                ))
169            })
170            .map_err(process_sqlite_error)?;
171        let mut entries = Vec::new();
172        for row in rows {
173            let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
174            let descriptor: ProcessHandleDescriptor =
175                serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
176            let record: ProcessRecord =
177                serde_json::from_str(&record_json).map_err(process_decode_error)?;
178            entries.push((
179                ProcessHandleGrant {
180                    session_id: session_scope.session_id.clone(),
181                    process_id,
182                    descriptor,
183                },
184                record,
185            ));
186        }
187        Ok(entries)
188    }
189}
190
191/// Map a `Result<T, PluginError>` produced by a synchronous transaction body to
192/// a [`TxOutcome`]: commit on success, roll back on logical error. Both arms
193/// carry the inner `Result` back so the caller recovers the value or the
194/// `PluginError` after the transaction resolves.
195fn tx_outcome<T>(
196    result: Result<T, lash_core::PluginError>,
197) -> TxOutcome<Result<T, lash_core::PluginError>> {
198    match result {
199        Ok(value) => TxOutcome::Commit(Ok(value)),
200        Err(err) => TxOutcome::Rollback(Err(err)),
201    }
202}
203
204#[async_trait::async_trait]
205impl ProcessRegistry for SqliteProcessRegistry {
206    fn durability_tier(&self) -> DurabilityTier {
207        DurabilityTier::Durable
208    }
209
210    async fn register_process(
211        &self,
212        registration: ProcessRegistration,
213    ) -> Result<ProcessRecord, lash_core::PluginError> {
214        let (registration, registration_hash) = prepare_process_registration(registration)?;
215        let record = self
216            .conn
217            .write_flow(move |tx| {
218                Ok(tx_outcome((|| {
219                    if let Some(existing) = Self::load_process_conn(tx, &registration.id)? {
220                        if existing.registration_hash == registration_hash {
221                            return Ok(existing);
222                        }
223                        return Err(lash_core::PluginError::Session(format!(
224                            "process `{}` registration hash conflict: existing {}, new {}",
225                            registration.id, existing.registration_hash, registration_hash
226                        )));
227                    }
228                    let now = current_epoch_ms();
229                    let record = ProcessRecord::from_prepared_registration(
230                        registration,
231                        registration_hash,
232                        now,
233                    );
234                    let originator_scope_id = record.originator_scope_id();
235                    tx.execute(
236                        "INSERT INTO processes (
237                            process_id, registration_hash, owner_scope_id,
238                            created_at_ms, updated_at_ms, status, record_json
239                         )
240                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
241                        params![
242                            record.id.as_str(),
243                            record.registration_hash.as_str(),
244                            originator_scope_id.as_str(),
245                            record.created_at_ms as i64,
246                            record.updated_at_ms as i64,
247                            process_status_label(&record),
248                            process_encode_json(&record)?,
249                        ],
250                    )
251                    .map_err(process_sqlite_error)?;
252                    Ok(record)
253                })()))
254            })
255            .await
256            .map_err(process_sqlite_error)??;
257        self.notify.notify_waiters();
258        Ok(record)
259    }
260
261    async fn set_external_ref(
262        &self,
263        process_id: &str,
264        external_ref: ProcessExternalRef,
265    ) -> Result<ProcessRecord, lash_core::PluginError> {
266        let process_id = process_id.to_string();
267        let record = self
268            .conn
269            .write_flow(move |tx| {
270                Ok(tx_outcome((|| {
271                    let mut record =
272                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
273                            lash_core::PluginError::Session(format!(
274                                "unknown process `{process_id}`"
275                            ))
276                        })?;
277                    record.external_ref = Some(external_ref);
278                    record.updated_at_ms = current_epoch_ms();
279                    Self::save_process_conn(tx, &record)?;
280                    Ok(record)
281                })()))
282            })
283            .await
284            .map_err(process_sqlite_error)??;
285        self.notify.notify_waiters();
286        Ok(record)
287    }
288
289    async fn grant_handle(
290        &self,
291        session_scope: &SessionScope,
292        process_id: &str,
293        descriptor: ProcessHandleDescriptor,
294    ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
295        let session_scope = session_scope.clone();
296        let process_id = process_id.to_string();
297        self.conn
298            .write_flow(move |tx| {
299                Ok(tx_outcome((|| {
300                    let session_scope_id = session_scope.id();
301                    if Self::load_process_conn(tx, &process_id)?.is_none() {
302                        return Err(lash_core::PluginError::Session(format!(
303                            "unknown process `{process_id}`"
304                        )));
305                    }
306                    tx.execute(
307                        "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
308                         VALUES (?1, ?2, ?3, ?4)
309                         ON CONFLICT(scope_id, process_id) DO UPDATE SET
310                            session_id = excluded.session_id,
311                            descriptor_json = excluded.descriptor_json",
312                        params![
313                            session_scope.session_id.as_str(),
314                            session_scope_id.as_str(),
315                            process_id.as_str(),
316                            process_encode_json(&descriptor)?
317                        ],
318                    )
319                    .map_err(process_sqlite_error)?;
320                    Ok(ProcessHandleGrant {
321                        session_id: session_scope.session_id.clone(),
322                        process_id: process_id.clone(),
323                        descriptor,
324                    })
325                })()))
326            })
327            .await
328            .map_err(process_sqlite_error)?
329    }
330
331    async fn revoke_handle(
332        &self,
333        session_scope: &SessionScope,
334        process_id: &str,
335    ) -> Result<(), lash_core::PluginError> {
336        let session_scope_id = session_scope.id().as_str().to_string();
337        let process_id = process_id.to_string();
338        self.conn
339            .call(move |conn| {
340                conn.execute(
341                    "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
342                    params![session_scope_id, process_id],
343                )
344            })
345            .await
346            .map_err(process_sqlite_error)?;
347        Ok(())
348    }
349
350    async fn transfer_handle_grants(
351        &self,
352        from_scope: &SessionScope,
353        to_scope: &SessionScope,
354        process_ids: &[String],
355    ) -> Result<(), lash_core::PluginError> {
356        let from_scope = from_scope.clone();
357        let to_scope = to_scope.clone();
358        let process_ids = process_ids.to_vec();
359        self.conn
360            .write_flow(move |tx| {
361                Ok(tx_outcome((|| {
362                    let from_scope_id = from_scope.id();
363                    let to_scope_id = to_scope.id();
364                    for process_id in &process_ids {
365                        let descriptor_json: Option<String> = tx
366                            .query_row(
367                                "SELECT descriptor_json
368                                 FROM process_handle_grants
369                                 WHERE scope_id = ?1 AND process_id = ?2",
370                                params![from_scope_id.as_str(), process_id.as_str()],
371                                |row| row.get(0),
372                            )
373                            .optional()
374                            .map_err(process_sqlite_error)?;
375                        let Some(descriptor_json) = descriptor_json else {
376                            return Err(lash_core::PluginError::Session(format!(
377                                "process handle `{process_id}` is not granted to session `{}`",
378                                from_scope.session_id
379                            )));
380                        };
381                        tx.execute(
382                            "DELETE FROM process_handle_grants
383                             WHERE scope_id = ?1 AND process_id = ?2",
384                            params![from_scope_id.as_str(), process_id.as_str()],
385                        )
386                        .map_err(process_sqlite_error)?;
387                        tx.execute(
388                            "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
389                             VALUES (?1, ?2, ?3, ?4)
390                             ON CONFLICT(scope_id, process_id) DO UPDATE SET
391                                session_id = excluded.session_id,
392                                descriptor_json = excluded.descriptor_json",
393                            params![
394                                to_scope.session_id.as_str(),
395                                to_scope_id.as_str(),
396                                process_id.as_str(),
397                                descriptor_json
398                            ],
399                        )
400                        .map_err(process_sqlite_error)?;
401                    }
402                    Ok(())
403                })()))
404            })
405            .await
406            .map_err(process_sqlite_error)?
407    }
408
409    async fn list_handle_grants(
410        &self,
411        session_scope: &SessionScope,
412    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
413        let session_scope = session_scope.clone();
414        self.conn
415            .call(move |conn| {
416                Ok(Self::list_grants_for_scope_conn(
417                    conn,
418                    &session_scope,
419                    false,
420                ))
421            })
422            .await
423            .map_err(process_sqlite_error)?
424    }
425
426    async fn list_live_handle_grants(
427        &self,
428        session_scope: &SessionScope,
429    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
430        let session_scope = session_scope.clone();
431        self.conn
432            .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &session_scope, true)))
433            .await
434            .map_err(process_sqlite_error)?
435    }
436
437    async fn has_handle_grant(
438        &self,
439        session_scope: &SessionScope,
440        process_id: &str,
441    ) -> Result<bool, lash_core::PluginError> {
442        let session_scope_id = session_scope.id().as_str().to_string();
443        let process_id = process_id.to_string();
444        self.conn
445            .call(move |conn| {
446                let exists = conn
447                    .query_row(
448                        "SELECT 1
449                         FROM process_handle_grants g
450                         JOIN processes p ON p.process_id = g.process_id
451                         WHERE g.scope_id = ?1 AND g.process_id = ?2
452                         LIMIT 1",
453                        params![session_scope_id, process_id],
454                        |_| Ok(()),
455                    )
456                    .optional()?
457                    .is_some();
458                Ok(exists)
459            })
460            .await
461            .map_err(process_sqlite_error)
462    }
463
464    async fn handle_grants_for_process(
465        &self,
466        process_id: &str,
467    ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
468        let process_id = process_id.to_string();
469        self.conn
470            .call(move |conn| {
471                Ok((|| {
472                    if Self::load_process_conn(conn, &process_id)?.is_none() {
473                        return Err(lash_core::PluginError::Session(format!(
474                            "unknown process `{process_id}`"
475                        )));
476                    }
477                    let mut stmt = conn
478                        .prepare(
479                            "SELECT session_id, descriptor_json
480                             FROM process_handle_grants
481                             WHERE process_id = ?1
482                             ORDER BY session_id ASC, scope_id ASC",
483                        )
484                        .map_err(process_sqlite_error)?;
485                    let rows = stmt
486                        .query_map(params![process_id], |row| {
487                            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
488                        })
489                        .map_err(process_sqlite_error)?;
490                    let mut grants = Vec::new();
491                    for row in rows {
492                        let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
493                        let descriptor: ProcessHandleDescriptor =
494                            serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
495                        grants.push(ProcessHandleGrant {
496                            session_id,
497                            process_id: process_id.clone(),
498                            descriptor,
499                        });
500                    }
501                    Ok(grants)
502                })())
503            })
504            .await
505            .map_err(process_sqlite_error)?
506    }
507
508    async fn delete_session_process_state(
509        &self,
510        session_id: &str,
511    ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
512        let session_id_owned = session_id.to_string();
513        let (
514            revoked_handle_count,
515            deleted_wake_count,
516            mut orphaned_process_ids,
517            mut preserved_process_ids,
518        ) = self
519            .conn
520            .write_flow(move |tx| {
521                Ok(tx_outcome((|| {
522                    let session_id = session_id_owned;
523                    let removed = {
524                        let mut stmt = tx
525                            .prepare(
526                                "SELECT g.process_id, p.record_json
527                                 FROM process_handle_grants g
528                                 JOIN processes p ON p.process_id = g.process_id
529                                 WHERE g.session_id = ?1
530                                 ORDER BY g.process_id ASC",
531                            )
532                            .map_err(process_sqlite_error)?;
533                        let rows = stmt
534                            .query_map(params![session_id], |row| {
535                                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
536                            })
537                            .map_err(process_sqlite_error)?;
538                        let mut removed = Vec::new();
539                        for row in rows {
540                            let (process_id, record_json) = row.map_err(process_sqlite_error)?;
541                            let record: ProcessRecord =
542                                serde_json::from_str(&record_json).map_err(process_decode_error)?;
543                            removed.push((process_id, record));
544                        }
545                        removed
546                    };
547
548                    // Wake acknowledgements are process-scoped consumed-event markers.
549                    // Session deletion removes materialized session-addressed deliveries
550                    // through the session store; clearing these rows would re-expose
551                    // already-consumed wakes to surviving grants or future host readers.
552                    let deleted_wake_count = 0;
553                    let revoked_handle_count = tx
554                        .execute(
555                            "DELETE FROM process_handle_grants WHERE session_id = ?1",
556                            params![session_id],
557                        )
558                        .map_err(process_sqlite_error)?;
559                    let mut orphaned_process_ids = Vec::new();
560                    let mut preserved_process_ids = Vec::new();
561                    for (process_id, record) in removed {
562                        if record.is_terminal() {
563                            continue;
564                        }
565                        let remaining_grants: i64 = tx
566                            .query_row(
567                                "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
568                                params![process_id],
569                                |row| row.get(0),
570                            )
571                            .map_err(process_sqlite_error)?;
572                        if remaining_grants == 0 {
573                            orphaned_process_ids.push(process_id);
574                        } else {
575                            preserved_process_ids.push(process_id);
576                        }
577                    }
578                    let wake_targeted = {
579                        let mut stmt = tx
580                            .prepare("SELECT process_id, record_json FROM processes ORDER BY process_id ASC")
581                            .map_err(process_sqlite_error)?;
582                        let rows = stmt
583                            .query_map([], |row| {
584                                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
585                            })
586                            .map_err(process_sqlite_error)?;
587                        let mut records = Vec::new();
588                        for row in rows {
589                            let (process_id, record_json) = row.map_err(process_sqlite_error)?;
590                            let record: ProcessRecord =
591                                serde_json::from_str(&record_json).map_err(process_decode_error)?;
592                            records.push((process_id, record));
593                        }
594                        records
595                    };
596                    for (_process_id, mut record) in wake_targeted {
597                        if record.clear_wake_target_for_session(&session_id) {
598                            Self::save_process_conn(tx, &record)?;
599                        }
600                    }
601                    Ok((
602                        revoked_handle_count,
603                        deleted_wake_count,
604                        orphaned_process_ids,
605                        preserved_process_ids,
606                    ))
607                })()))
608            })
609            .await
610            .map_err(process_sqlite_error)??;
611        orphaned_process_ids.sort();
612        orphaned_process_ids.dedup();
613        preserved_process_ids.sort();
614        preserved_process_ids.dedup();
615        Ok(lash_core::ProcessSessionDeleteReport {
616            session_id: session_id.to_string(),
617            revoked_handle_count,
618            deleted_wake_count,
619            orphaned_process_ids,
620            preserved_process_ids,
621        })
622    }
623
624    async fn append_event(
625        &self,
626        process_id: &str,
627        request: ProcessEventAppendRequest,
628    ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
629        let process_id = process_id.to_string();
630        let (result, appended) = self
631            .conn
632            .write_flow(move |tx| {
633                Ok(tx_outcome((|| {
634                    let mut record =
635                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
636                            lash_core::PluginError::Session(format!(
637                                "unknown process `{process_id}`"
638                            ))
639                        })?;
640                    let replay_lookup = if let Some(replay_key) =
641                        request.replay.as_ref().map(|replay| replay.key.as_str())
642                    {
643                        Self::load_event_by_key_conn(tx, &process_id, replay_key)?
644                    } else {
645                        None
646                    };
647                    let sequence = tx
648                        .query_row(
649                            "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
650                            params![process_id],
651                            |row| row.get::<_, i64>(0),
652                        )
653                        .map_err(process_sqlite_error)? as u64;
654                    let occurred_at_ms = current_epoch_ms();
655                    let prepared = prepare_process_event_append(
656                        &record,
657                        request,
658                        sequence,
659                        replay_lookup,
660                        occurred_at_ms,
661                    )?;
662                    if prepared.replayed {
663                        return Ok((
664                            ProcessEventAppendResult {
665                                event: prepared.event,
666                                wake_delivery: prepared.wake_delivery,
667                            },
668                            false,
669                        ));
670                    }
671                    let event = prepared.event;
672                    tx.execute(
673                        "INSERT INTO process_events (
674                            process_id, sequence, event_type, payload_hash, idempotency_key,
675                            occurred_at_ms, event_json
676                         )
677                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
678                        params![
679                            process_id,
680                            sequence as i64,
681                            event.event_type.as_str(),
682                            prepared.payload_hash.as_str(),
683                            event.invocation.replay_key(),
684                            prepared.occurred_at_ms as i64,
685                            process_encode_json(&event)?,
686                        ],
687                    )
688                    .map_err(process_sqlite_error)?;
689                    if let Some(status) = prepared.status_update.clone() {
690                        record.status = status;
691                        if record.status.is_terminal() {
692                            record.wait = None;
693                        }
694                    }
695                    record.updated_at_ms = prepared.occurred_at_ms;
696                    Self::save_process_conn(tx, &record)?;
697                    Ok((
698                        ProcessEventAppendResult {
699                            event,
700                            wake_delivery: prepared.wake_delivery,
701                        },
702                        true,
703                    ))
704                })()))
705            })
706            .await
707            .map_err(process_sqlite_error)??;
708        if appended {
709            self.notify.notify_waiters();
710        }
711        Ok(result)
712    }
713
714    async fn events_after(
715        &self,
716        process_id: &str,
717        after_sequence: u64,
718    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
719        let process_id = process_id.to_string();
720        self.conn
721            .call(move |conn| {
722                Ok((|| {
723                    if Self::load_process_conn(conn, &process_id)?.is_none() {
724                        return Err(lash_core::PluginError::Session(format!(
725                            "unknown process `{process_id}`"
726                        )));
727                    }
728                    let mut stmt = conn
729                        .prepare(
730                            "SELECT event_json FROM process_events
731                             WHERE process_id = ?1 AND sequence > ?2
732                             ORDER BY sequence ASC",
733                        )
734                        .map_err(process_sqlite_error)?;
735                    let rows = stmt
736                        .query_map(params![process_id, after_sequence as i64], |row| {
737                            row.get::<_, String>(0)
738                        })
739                        .map_err(process_sqlite_error)?;
740                    let mut events = Vec::new();
741                    for row in rows {
742                        events.push(
743                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
744                                .map_err(process_decode_error)?,
745                        );
746                    }
747                    Ok(events)
748                })())
749            })
750            .await
751            .map_err(process_sqlite_error)?
752    }
753
754    async fn count_events_through(
755        &self,
756        process_id: &str,
757        event_type: &str,
758        up_to_sequence: u64,
759    ) -> Result<u64, lash_core::PluginError> {
760        let process_id = process_id.to_string();
761        let event_type = event_type.to_string();
762        self.conn
763            .call(move |conn| {
764                Ok((|| {
765                    if Self::load_process_conn(conn, &process_id)?.is_none() {
766                        return Err(lash_core::PluginError::Session(format!(
767                            "unknown process `{process_id}`"
768                        )));
769                    }
770                    conn.query_row(
771                        "SELECT COUNT(*) FROM process_events
772                         WHERE process_id = ?1 AND event_type = ?2 AND sequence <= ?3",
773                        params![process_id, event_type, up_to_sequence as i64],
774                        |row| row.get::<_, i64>(0),
775                    )
776                    .map(|count| count as u64)
777                    .map_err(process_sqlite_error)
778                })())
779            })
780            .await
781            .map_err(process_sqlite_error)?
782    }
783
784    async fn recent_events(
785        &self,
786        process_id: &str,
787        limit: usize,
788    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
789        let process_id = process_id.to_string();
790        self.conn
791            .call(move |conn| {
792                Ok((|| {
793                    if Self::load_process_conn(conn, &process_id)?.is_none() {
794                        return Err(lash_core::PluginError::Session(format!(
795                            "unknown process `{process_id}`"
796                        )));
797                    }
798                    let mut stmt = conn
799                        .prepare(
800                            "SELECT event_json FROM process_events
801                             WHERE process_id = ?1
802                             ORDER BY sequence DESC
803                             LIMIT ?2",
804                        )
805                        .map_err(process_sqlite_error)?;
806                    let rows = stmt
807                        .query_map(params![process_id, limit as i64], |row| {
808                            row.get::<_, String>(0)
809                        })
810                        .map_err(process_sqlite_error)?;
811                    let mut events: Vec<ProcessEvent> = Vec::new();
812                    for row in rows {
813                        events.push(
814                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
815                                .map_err(process_decode_error)?,
816                        );
817                    }
818                    events.reverse();
819                    Ok(events)
820                })())
821            })
822            .await
823            .map_err(process_sqlite_error)?
824    }
825
826    async fn wake_events_after(
827        &self,
828        process_id: &str,
829        after_sequence: u64,
830    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
831        let acked: std::collections::HashSet<u64> = {
832            let process_id = process_id.to_string();
833            self.conn
834                .call(move |conn| {
835                    Ok(
836                        (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
837                            let mut stmt = conn
838                                .prepare(
839                                    "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
840                                )
841                                .map_err(process_sqlite_error)?;
842                            let rows = stmt
843                                .query_map(params![process_id], |row| row.get::<_, i64>(0))
844                                .map_err(process_sqlite_error)?;
845                            let mut set = std::collections::HashSet::new();
846                            for row in rows {
847                                set.insert(row.map_err(process_sqlite_error)? as u64);
848                            }
849                            Ok(set)
850                        })(),
851                    )
852                })
853                .await
854                .map_err(process_sqlite_error)??
855        };
856        Ok(self
857            .events_after(process_id, after_sequence)
858            .await?
859            .into_iter()
860            .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
861            .collect())
862    }
863
864    async fn wait_event_after(
865        &self,
866        process_id: &str,
867        event_type: &str,
868        after_sequence: u64,
869    ) -> Result<ProcessEvent, lash_core::PluginError> {
870        loop {
871            if let Some(event) = self
872                .events_after(process_id, after_sequence)
873                .await?
874                .into_iter()
875                .find(|event| event.event_type == event_type)
876            {
877                return Ok(event);
878            }
879            tokio::select! {
880                _ = self.notify.notified() => {}
881                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
882            }
883        }
884    }
885
886    async fn await_process(
887        &self,
888        process_id: &str,
889    ) -> Result<ProcessAwaitOutput, lash_core::PluginError> {
890        loop {
891            let record = self.get_process(process_id).await.ok_or_else(|| {
892                lash_core::PluginError::Session(format!("unknown process `{process_id}`"))
893            })?;
894            if let Some(await_output) = record.status.await_output() {
895                return Ok(await_output.clone());
896            }
897            tokio::select! {
898                _ = self.notify.notified() => {}
899                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
900            }
901        }
902    }
903
904    async fn complete_process(
905        &self,
906        process_id: &str,
907        await_output: ProcessAwaitOutput,
908    ) -> Result<ProcessRecord, lash_core::PluginError> {
909        let event_type = match await_output.terminal_state() {
910            lash_core::ProcessTerminalState::Completed => "process.completed",
911            lash_core::ProcessTerminalState::Failed => "process.failed",
912            lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
913        };
914        self.append_event(
915            process_id,
916            ProcessEventAppendRequest::new(
917                event_type,
918                serde_json::json!({ "await_output": await_output }),
919            )
920            .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
921        )
922        .await?;
923        self.get_process(process_id).await.ok_or_else(|| {
924            lash_core::PluginError::Session(format!(
925                "unknown process `{process_id}` after terminal event"
926            ))
927        })
928    }
929
930    async fn set_process_wait(
931        &self,
932        process_id: &str,
933        wait: lash_core::WaitState,
934    ) -> Result<ProcessRecord, lash_core::PluginError> {
935        let process_id = process_id.to_string();
936        self.conn
937            .write_flow(move |tx| {
938                Ok(tx_outcome((|| {
939                    let mut record =
940                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
941                            lash_core::PluginError::Session(format!(
942                                "unknown process `{process_id}`"
943                            ))
944                        })?;
945                    if record.is_terminal() {
946                        return Err(lash_core::PluginError::Session(format!(
947                            "terminal process `{process_id}` cannot enter a wait state"
948                        )));
949                    }
950                    record.wait = Some(wait);
951                    record.updated_at_ms = current_epoch_ms();
952                    Self::save_process_conn(tx, &record)?;
953                    Ok(record)
954                })()))
955            })
956            .await
957            .map_err(process_sqlite_error)?
958    }
959
960    async fn clear_process_wait(
961        &self,
962        process_id: &str,
963    ) -> Result<ProcessRecord, lash_core::PluginError> {
964        let process_id = process_id.to_string();
965        self.conn
966            .write_flow(move |tx| {
967                Ok(tx_outcome((|| {
968                    let mut record =
969                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
970                            lash_core::PluginError::Session(format!(
971                                "unknown process `{process_id}`"
972                            ))
973                        })?;
974                    record.wait = None;
975                    record.updated_at_ms = current_epoch_ms();
976                    Self::save_process_conn(tx, &record)?;
977                    Ok(record)
978                })()))
979            })
980            .await
981            .map_err(process_sqlite_error)?
982    }
983
984    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
985        let process_id = process_id.to_string();
986        self.conn
987            .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
988            .await
989            .ok()
990            .flatten()
991    }
992
993    async fn list_processes(
994        &self,
995        filter: &lash_core::ProcessListFilter,
996    ) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
997        let filter = filter.clone();
998        self.conn
999            .call(move |conn| {
1000                Ok((|| {
1001                    let mut stmt = conn
1002                        .prepare(
1003                            "SELECT record_json FROM processes
1004                             ORDER BY process_id ASC",
1005                        )
1006                        .map_err(process_sqlite_error)?;
1007                    let rows = stmt
1008                        .query_map([], |row| row.get::<_, String>(0))
1009                        .map_err(process_sqlite_error)?;
1010                    let mut records = Vec::new();
1011                    for row in rows {
1012                        let record: ProcessRecord =
1013                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
1014                                .map_err(process_decode_error)?;
1015                        if filter.matches_record(&record) {
1016                            records.push(record);
1017                        }
1018                    }
1019                    Ok(records)
1020                })())
1021            })
1022            .await
1023            .map_err(process_sqlite_error)?
1024    }
1025
1026    async fn ack_wake(
1027        &self,
1028        process_id: &str,
1029        sequence: u64,
1030    ) -> Result<(), lash_core::PluginError> {
1031        let process_id = process_id.to_string();
1032        self.conn
1033            .call(move |conn| {
1034                Ok((|| {
1035                    if Self::load_process_conn(conn, &process_id)?.is_none() {
1036                        return Err(lash_core::PluginError::Session(format!(
1037                            "unknown process `{process_id}`"
1038                        )));
1039                    }
1040                    conn.execute(
1041                        "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
1042                        params![process_id, sequence as i64],
1043                    )
1044                    .map_err(process_sqlite_error)?;
1045                    Ok(())
1046                })())
1047            })
1048            .await
1049            .map_err(process_sqlite_error)?
1050    }
1051
1052    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1053        self.conn
1054            .call(move |conn| {
1055                Ok((|| {
1056                    let mut stmt = conn
1057                        .prepare(
1058                            "SELECT record_json FROM processes
1059                             WHERE status = 'running'
1060                             ORDER BY process_id ASC",
1061                        )
1062                        .map_err(process_sqlite_error)?;
1063                    let rows = stmt
1064                        .query_map([], |row| row.get::<_, String>(0))
1065                        .map_err(process_sqlite_error)?;
1066                    let mut records = Vec::new();
1067                    for row in rows {
1068                        let record: ProcessRecord =
1069                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
1070                                .map_err(process_decode_error)?;
1071                        records.push(record);
1072                    }
1073                    Ok(records)
1074                })())
1075            })
1076            .await
1077            .map_err(process_sqlite_error)?
1078    }
1079
1080    async fn claim_process_lease(
1081        &self,
1082        process_id: &str,
1083        owner_id: &str,
1084        lease_ttl_ms: u64,
1085    ) -> Result<ProcessLease, lash_core::PluginError> {
1086        let process_id = process_id.to_string();
1087        let owner_id = owner_id.to_string();
1088        self.conn
1089            .write_flow(move |tx| {
1090                Ok(tx_outcome((|| {
1091                    if Self::load_process_conn(tx, &process_id)?.is_none() {
1092                        return Err(lash_core::PluginError::Session(format!(
1093                            "unknown process `{process_id}`"
1094                        )));
1095                    }
1096                    let now = current_epoch_ms();
1097                    let current = Self::load_process_lease_conn(tx, &process_id)?;
1098                    if let Some(current) = current.as_ref()
1099                        && current.expires_at_epoch_ms > now
1100                        && current.owner_id != owner_id
1101                    {
1102                        return Err(process_lease_conflict(&process_id, current));
1103                    }
1104                    // Read the raw fencing token directly: a completed/abandoned
1105                    // lease nulls the owner/token columns but retains the
1106                    // monotonically-increasing `lease_fencing_token`, so a
1107                    // re-claim never reuses a stale writer's token.
1108                    let fencing_token: u64 = tx
1109                        .query_row(
1110                            "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1111                            params![process_id],
1112                            |row| row.get::<_, i64>(0),
1113                        )
1114                        .optional()
1115                        .map_err(process_sqlite_error)?
1116                        .unwrap_or(0) as u64
1117                        + 1;
1118                    let lease = ProcessLease {
1119                        schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1120                        process_id: process_id.clone(),
1121                        owner_id: owner_id.clone(),
1122                        lease_token: format!(
1123                            "{:x}",
1124                            Sha256::digest(
1125                                format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes()
1126                            )
1127                        ),
1128                        fencing_token,
1129                        claimed_at_epoch_ms: now,
1130                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1131                    };
1132                    tx.execute(
1133                        "INSERT INTO process_leases (
1134                            process_id, lease_owner_id, lease_token, lease_fencing_token,
1135                            lease_claimed_at_ms, lease_expires_at_ms
1136                         )
1137                         VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1138                         ON CONFLICT(process_id) DO UPDATE SET
1139                            lease_owner_id = excluded.lease_owner_id,
1140                            lease_token = excluded.lease_token,
1141                            lease_fencing_token = excluded.lease_fencing_token,
1142                            lease_claimed_at_ms = excluded.lease_claimed_at_ms,
1143                            lease_expires_at_ms = excluded.lease_expires_at_ms",
1144                        params![
1145                            lease.process_id.as_str(),
1146                            lease.owner_id.as_str(),
1147                            lease.lease_token.as_str(),
1148                            lease.fencing_token as i64,
1149                            lease.claimed_at_epoch_ms as i64,
1150                            lease.expires_at_epoch_ms as i64,
1151                        ],
1152                    )
1153                    .map_err(process_sqlite_error)?;
1154                    Ok(lease)
1155                })()))
1156            })
1157            .await
1158            .map_err(process_sqlite_error)?
1159    }
1160
1161    async fn renew_process_lease(
1162        &self,
1163        lease: &ProcessLease,
1164        lease_ttl_ms: u64,
1165    ) -> Result<ProcessLease, lash_core::PluginError> {
1166        let lease = lease.clone();
1167        self.conn
1168            .write_flow(move |tx| {
1169                Ok(tx_outcome((|| {
1170                    let now = current_epoch_ms();
1171                    let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
1172                    if !guard_lease(current.as_ref(), &lease.lease_token, now) {
1173                        return Err(process_lease_expired(&lease.process_id));
1174                    }
1175                    let renewed = ProcessLease {
1176                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1177                        ..lease.clone()
1178                    };
1179                    tx.execute(
1180                        "UPDATE process_leases
1181                         SET lease_expires_at_ms = ?2
1182                         WHERE process_id = ?1 AND lease_token = ?3",
1183                        params![
1184                            renewed.process_id.as_str(),
1185                            renewed.expires_at_epoch_ms as i64,
1186                            renewed.lease_token.as_str(),
1187                        ],
1188                    )
1189                    .map_err(process_sqlite_error)?;
1190                    Ok(renewed)
1191                })()))
1192            })
1193            .await
1194            .map_err(process_sqlite_error)?
1195    }
1196
1197    async fn complete_process_lease(
1198        &self,
1199        completion: &ProcessLeaseCompletion,
1200    ) -> Result<(), lash_core::PluginError> {
1201        let process_id = completion.process_id.clone();
1202        let lease_token = completion.lease_token.clone();
1203        self.conn
1204            .call(move |conn| {
1205                conn.execute(
1206                    "UPDATE process_leases
1207                     SET lease_owner_id = NULL,
1208                         lease_token = NULL,
1209                         lease_claimed_at_ms = 0,
1210                         lease_expires_at_ms = 0
1211                     WHERE process_id = ?1 AND lease_token = ?2",
1212                    params![process_id, lease_token],
1213                )
1214            })
1215            .await
1216            .map_err(process_sqlite_error)?;
1217        Ok(())
1218    }
1219}
1220
1221/// Loud, stable error for a fenced process-lease claim on the `PluginError`
1222/// channel the [`ProcessRegistry`] trait returns.
1223fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> lash_core::PluginError {
1224    lash_core::PluginError::Session(format!(
1225        "process `{process_id}` is already leased by `{}` until {}",
1226        current.owner_id, current.expires_at_epoch_ms
1227    ))
1228}
1229
1230/// Loud, stable error for a superseded or expired process lease.
1231fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1232    lash_core::PluginError::Session(format!(
1233        "process lease for `{process_id}` is missing or expired"
1234    ))
1235}