Skip to main content

lash_sqlite_store/
process_registry.rs

1//! SQLite-backed [`ProcessRegistry`] (`SqliteProcessRegistry`).
2//!
3//! First-party SQLite implementation of the public async process-registry
4//! surface. Every DB body is a *synchronous* rusqlite closure handed
5//! to [`SqliteConnection::call`] (reads) or [`SqliteConnection::write_flow`]
6//! (read-then-write).
7//!
8//! ## Why `write_flow`, not `write`
9//!
10//! The registry's transactional methods produce a [`lash_core::PluginError`],
11//! not a `rusqlite::Error`. `SqliteConnection::write` rolls back only when the
12//! closure returns `Err(rusqlite::Error)`, so a logical `PluginError` (e.g. a
13//! registration-hash conflict) would otherwise *commit* the partial work. Each
14//! such method therefore runs its synchronous body returning
15//! `Result<T, PluginError>` and maps it to a [`TxOutcome`]: `Ok` ⇒
16//! `Commit(Ok(value))`, `Err` ⇒ `Rollback(Err(error))`. That preserves the
17//! prior behaviour of rolling back on every error while still carrying the
18//! `PluginError` back to the caller. The outer `rusqlite::Error` channel only
19//! carries genuine SQLite/connection failures, mapped via `process_sqlite_error`.
20//!
21//! The `*_conn` helpers are synchronous and take a `&rusqlite::Connection` so
22//! they compose inside either closure — including from within a `&Transaction`,
23//! which derefs to `&Connection`.
24
25use super::*;
26
27fn process_status_label(record: &ProcessRecord) -> &'static str {
28    record.status.label()
29}
30
31impl SqliteProcessRegistry {
32    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
33        let conn = SqliteConnection::open(path).await?;
34        ensure_process_schema(&conn).await?;
35        apply_pragmas(&conn, StoreBacking::File).await?;
36        Ok(Self {
37            conn,
38            notify: tokio::sync::Notify::new(),
39        })
40    }
41
42    pub async fn memory() -> tokio_rusqlite::Result<Self> {
43        let conn = SqliteConnection::open_in_memory().await?;
44        ensure_process_schema(&conn).await?;
45        apply_pragmas(&conn, StoreBacking::Memory).await?;
46        Ok(Self {
47            conn,
48            notify: tokio::sync::Notify::new(),
49        })
50    }
51
52    fn load_process_conn(
53        conn: &Connection,
54        process_id: &str,
55    ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
56        let json: Option<String> = conn
57            .query_row(
58                "SELECT record_json FROM processes WHERE process_id = ?1",
59                params![process_id],
60                |row| row.get(0),
61            )
62            .optional()
63            .map_err(process_sqlite_error)?;
64        json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
65            .transpose()
66    }
67
68    fn save_process_conn(
69        conn: &Connection,
70        record: &ProcessRecord,
71    ) -> Result<(), lash_core::PluginError> {
72        conn.execute(
73            "UPDATE processes
74             SET updated_at_ms = ?2, status = ?3, record_json = ?4
75             WHERE process_id = ?1",
76            params![
77                record.id.as_str(),
78                record.updated_at_ms as i64,
79                process_status_label(record),
80                process_encode_json(record)?
81            ],
82        )
83        .map_err(process_sqlite_error)?;
84        Ok(())
85    }
86
87    fn load_event_by_key_conn(
88        conn: &Connection,
89        process_id: &str,
90        replay_key: &str,
91    ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
92        let row: Option<(String, String)> = conn
93            .query_row(
94                "SELECT payload_hash, event_json
95                 FROM process_events
96                 WHERE process_id = ?1 AND idempotency_key = ?2",
97                params![process_id, replay_key],
98                |row| Ok((row.get(0)?, row.get(1)?)),
99            )
100            .optional()
101            .map_err(process_sqlite_error)?;
102        row.map(|(hash, json)| {
103            serde_json::from_str(&json)
104                .map(|event| (hash, event))
105                .map_err(process_decode_error)
106        })
107        .transpose()
108    }
109
110    fn load_process_lease_conn(
111        conn: &Connection,
112        process_id: &str,
113    ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
114        conn.query_row(
115            "SELECT lease_owner_id, lease_token, lease_fencing_token,
116                    lease_claimed_at_ms, lease_expires_at_ms
117             FROM process_leases
118             WHERE process_id = ?1",
119            params![process_id],
120            |row| {
121                let owner_id: Option<String> = row.get(0)?;
122                let lease_token: Option<String> = row.get(1)?;
123                let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
124                    return Ok(None);
125                };
126                Ok(Some(ProcessLease {
127                    schema_version: PROCESS_LEASE_SCHEMA_VERSION,
128                    process_id: process_id.to_string(),
129                    owner_id,
130                    lease_token,
131                    fencing_token: row.get::<_, i64>(2)? as u64,
132                    claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
133                    expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
134                }))
135            },
136        )
137        .optional()
138        .map(|lease| lease.flatten())
139        .map_err(process_sqlite_error)
140    }
141
142    fn list_grants_for_scope_conn(
143        conn: &Connection,
144        session_scope: &SessionScope,
145        live_only: bool,
146    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
147        let session_scope_id = session_scope.id();
148        let status_clause = if live_only {
149            "AND p.status = 'running'"
150        } else {
151            ""
152        };
153        let mut stmt = conn
154            .prepare(&format!(
155                "SELECT g.process_id, g.descriptor_json, p.record_json
156                 FROM process_handle_grants g
157                 JOIN processes p ON p.process_id = g.process_id
158                 WHERE g.scope_id = ?1 {status_clause}
159                 ORDER BY g.process_id ASC"
160            ))
161            .map_err(process_sqlite_error)?;
162        let rows = stmt
163            .query_map(params![session_scope_id.as_str()], |row| {
164                Ok((
165                    row.get::<_, String>(0)?,
166                    row.get::<_, String>(1)?,
167                    row.get::<_, String>(2)?,
168                ))
169            })
170            .map_err(process_sqlite_error)?;
171        let mut entries = Vec::new();
172        for row in rows {
173            let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
174            let descriptor: ProcessHandleDescriptor =
175                serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
176            let record: ProcessRecord =
177                serde_json::from_str(&record_json).map_err(process_decode_error)?;
178            entries.push((
179                ProcessHandleGrant {
180                    session_id: session_scope.session_id.clone(),
181                    process_id,
182                    descriptor,
183                },
184                record,
185            ));
186        }
187        Ok(entries)
188    }
189}
190
191/// Map a `Result<T, PluginError>` produced by a synchronous transaction body to
192/// a [`TxOutcome`]: commit on success, roll back on logical error. Both arms
193/// carry the inner `Result` back so the caller recovers the value or the
194/// `PluginError` after the transaction resolves.
195fn tx_outcome<T>(
196    result: Result<T, lash_core::PluginError>,
197) -> TxOutcome<Result<T, lash_core::PluginError>> {
198    match result {
199        Ok(value) => TxOutcome::Commit(Ok(value)),
200        Err(err) => TxOutcome::Rollback(Err(err)),
201    }
202}
203
204#[async_trait::async_trait]
205impl ProcessRegistry for SqliteProcessRegistry {
206    fn durability_tier(&self) -> DurabilityTier {
207        DurabilityTier::Durable
208    }
209
210    async fn register_process(
211        &self,
212        registration: ProcessRegistration,
213    ) -> Result<ProcessRecord, lash_core::PluginError> {
214        let (registration, registration_hash) = prepare_process_registration(registration)?;
215        let record = self
216            .conn
217            .write_flow(move |tx| {
218                Ok(tx_outcome((|| {
219                    if let Some(existing) = Self::load_process_conn(tx, &registration.id)? {
220                        if existing.registration_hash == registration_hash {
221                            return Ok(existing);
222                        }
223                        return Err(lash_core::PluginError::Session(format!(
224                            "process `{}` registration hash conflict: existing {}, new {}",
225                            registration.id, existing.registration_hash, registration_hash
226                        )));
227                    }
228                    let now = current_epoch_ms();
229                    let record = ProcessRecord::from_prepared_registration(
230                        registration,
231                        registration_hash,
232                        now,
233                    );
234                    let originator_scope_id = record.originator_scope_id();
235                    tx.execute(
236                        "INSERT INTO processes (
237                            process_id, registration_hash, owner_scope_id,
238                            created_at_ms, updated_at_ms, status, record_json
239                         )
240                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
241                        params![
242                            record.id.as_str(),
243                            record.registration_hash.as_str(),
244                            originator_scope_id.as_str(),
245                            record.created_at_ms as i64,
246                            record.updated_at_ms as i64,
247                            process_status_label(&record),
248                            process_encode_json(&record)?,
249                        ],
250                    )
251                    .map_err(process_sqlite_error)?;
252                    Ok(record)
253                })()))
254            })
255            .await
256            .map_err(process_sqlite_error)??;
257        self.notify.notify_waiters();
258        Ok(record)
259    }
260
261    async fn set_external_ref(
262        &self,
263        process_id: &str,
264        external_ref: ProcessExternalRef,
265    ) -> Result<ProcessRecord, lash_core::PluginError> {
266        let process_id = process_id.to_string();
267        let (record, changed) = self
268            .conn
269            .write_flow(move |tx| {
270                Ok(tx_outcome((|| {
271                    let mut record =
272                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
273                            lash_core::PluginError::Session(format!(
274                                "unknown process `{process_id}`"
275                            ))
276                        })?;
277                    if let Some(existing) = &record.external_ref {
278                        if existing == &external_ref {
279                            return Ok((record, false));
280                        }
281                        return Err(process_external_ref_conflict(
282                            &process_id,
283                            existing,
284                            &external_ref,
285                        ));
286                    }
287                    record.external_ref = Some(external_ref);
288                    record.updated_at_ms = current_epoch_ms();
289                    Self::save_process_conn(tx, &record)?;
290                    Ok((record, true))
291                })()))
292            })
293            .await
294            .map_err(process_sqlite_error)??;
295        if changed {
296            self.notify.notify_waiters();
297        }
298        Ok(record)
299    }
300
301    async fn grant_handle(
302        &self,
303        session_scope: &SessionScope,
304        process_id: &str,
305        descriptor: ProcessHandleDescriptor,
306    ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
307        let session_scope = session_scope.clone();
308        let process_id = process_id.to_string();
309        self.conn
310            .write_flow(move |tx| {
311                Ok(tx_outcome((|| {
312                    let session_scope_id = session_scope.id();
313                    if Self::load_process_conn(tx, &process_id)?.is_none() {
314                        return Err(lash_core::PluginError::Session(format!(
315                            "unknown process `{process_id}`"
316                        )));
317                    }
318                    tx.execute(
319                        "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
320                         VALUES (?1, ?2, ?3, ?4)
321                         ON CONFLICT(scope_id, process_id) DO UPDATE SET
322                            session_id = excluded.session_id,
323                            descriptor_json = excluded.descriptor_json",
324                        params![
325                            session_scope.session_id.as_str(),
326                            session_scope_id.as_str(),
327                            process_id.as_str(),
328                            process_encode_json(&descriptor)?
329                        ],
330                    )
331                    .map_err(process_sqlite_error)?;
332                    Ok(ProcessHandleGrant {
333                        session_id: session_scope.session_id.clone(),
334                        process_id: process_id.clone(),
335                        descriptor,
336                    })
337                })()))
338            })
339            .await
340            .map_err(process_sqlite_error)?
341    }
342
343    async fn revoke_handle(
344        &self,
345        session_scope: &SessionScope,
346        process_id: &str,
347    ) -> Result<(), lash_core::PluginError> {
348        let session_scope_id = session_scope.id().as_str().to_string();
349        let process_id = process_id.to_string();
350        self.conn
351            .call(move |conn| {
352                conn.execute(
353                    "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
354                    params![session_scope_id, process_id],
355                )
356            })
357            .await
358            .map_err(process_sqlite_error)?;
359        Ok(())
360    }
361
362    async fn transfer_handle_grants(
363        &self,
364        from_scope: &SessionScope,
365        to_scope: &SessionScope,
366        process_ids: &[String],
367    ) -> Result<(), lash_core::PluginError> {
368        let from_scope = from_scope.clone();
369        let to_scope = to_scope.clone();
370        let process_ids = process_ids.to_vec();
371        self.conn
372            .write_flow(move |tx| {
373                Ok(tx_outcome((|| {
374                    let from_scope_id = from_scope.id();
375                    let to_scope_id = to_scope.id();
376                    for process_id in &process_ids {
377                        let descriptor_json: Option<String> = tx
378                            .query_row(
379                                "SELECT descriptor_json
380                                 FROM process_handle_grants
381                                 WHERE scope_id = ?1 AND process_id = ?2",
382                                params![from_scope_id.as_str(), process_id.as_str()],
383                                |row| row.get(0),
384                            )
385                            .optional()
386                            .map_err(process_sqlite_error)?;
387                        let Some(descriptor_json) = descriptor_json else {
388                            return Err(lash_core::PluginError::Session(format!(
389                                "process handle `{process_id}` is not granted to session `{}`",
390                                from_scope.session_id
391                            )));
392                        };
393                        tx.execute(
394                            "DELETE FROM process_handle_grants
395                             WHERE scope_id = ?1 AND process_id = ?2",
396                            params![from_scope_id.as_str(), process_id.as_str()],
397                        )
398                        .map_err(process_sqlite_error)?;
399                        tx.execute(
400                            "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
401                             VALUES (?1, ?2, ?3, ?4)
402                             ON CONFLICT(scope_id, process_id) DO UPDATE SET
403                                session_id = excluded.session_id,
404                                descriptor_json = excluded.descriptor_json",
405                            params![
406                                to_scope.session_id.as_str(),
407                                to_scope_id.as_str(),
408                                process_id.as_str(),
409                                descriptor_json
410                            ],
411                        )
412                        .map_err(process_sqlite_error)?;
413                    }
414                    Ok(())
415                })()))
416            })
417            .await
418            .map_err(process_sqlite_error)?
419    }
420
421    async fn list_handle_grants(
422        &self,
423        session_scope: &SessionScope,
424    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
425        let session_scope = session_scope.clone();
426        self.conn
427            .call(move |conn| {
428                Ok(Self::list_grants_for_scope_conn(
429                    conn,
430                    &session_scope,
431                    false,
432                ))
433            })
434            .await
435            .map_err(process_sqlite_error)?
436    }
437
438    async fn list_live_handle_grants(
439        &self,
440        session_scope: &SessionScope,
441    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
442        let session_scope = session_scope.clone();
443        self.conn
444            .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &session_scope, true)))
445            .await
446            .map_err(process_sqlite_error)?
447    }
448
449    async fn has_handle_grant(
450        &self,
451        session_scope: &SessionScope,
452        process_id: &str,
453    ) -> Result<bool, lash_core::PluginError> {
454        let session_scope_id = session_scope.id().as_str().to_string();
455        let process_id = process_id.to_string();
456        self.conn
457            .call(move |conn| {
458                let exists = conn
459                    .query_row(
460                        "SELECT 1
461                         FROM process_handle_grants g
462                         JOIN processes p ON p.process_id = g.process_id
463                         WHERE g.scope_id = ?1 AND g.process_id = ?2
464                         LIMIT 1",
465                        params![session_scope_id, process_id],
466                        |_| Ok(()),
467                    )
468                    .optional()?
469                    .is_some();
470                Ok(exists)
471            })
472            .await
473            .map_err(process_sqlite_error)
474    }
475
476    async fn handle_grants_for_process(
477        &self,
478        process_id: &str,
479    ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
480        let process_id = process_id.to_string();
481        self.conn
482            .call(move |conn| {
483                Ok((|| {
484                    if Self::load_process_conn(conn, &process_id)?.is_none() {
485                        return Err(lash_core::PluginError::Session(format!(
486                            "unknown process `{process_id}`"
487                        )));
488                    }
489                    let mut stmt = conn
490                        .prepare(
491                            "SELECT session_id, descriptor_json
492                             FROM process_handle_grants
493                             WHERE process_id = ?1
494                             ORDER BY session_id ASC, scope_id ASC",
495                        )
496                        .map_err(process_sqlite_error)?;
497                    let rows = stmt
498                        .query_map(params![process_id], |row| {
499                            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
500                        })
501                        .map_err(process_sqlite_error)?;
502                    let mut grants = Vec::new();
503                    for row in rows {
504                        let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
505                        let descriptor: ProcessHandleDescriptor =
506                            serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
507                        grants.push(ProcessHandleGrant {
508                            session_id,
509                            process_id: process_id.clone(),
510                            descriptor,
511                        });
512                    }
513                    Ok(grants)
514                })())
515            })
516            .await
517            .map_err(process_sqlite_error)?
518    }
519
520    async fn delete_session_process_state(
521        &self,
522        session_id: &str,
523    ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
524        let session_id_owned = session_id.to_string();
525        let (
526            revoked_handle_count,
527            deleted_wake_count,
528            mut orphaned_process_ids,
529            mut preserved_process_ids,
530        ) = self
531            .conn
532            .write_flow(move |tx| {
533                Ok(tx_outcome((|| {
534                    let session_id = session_id_owned;
535                    let removed = {
536                        let mut stmt = tx
537                            .prepare(
538                                "SELECT g.process_id, p.record_json
539                                 FROM process_handle_grants g
540                                 JOIN processes p ON p.process_id = g.process_id
541                                 WHERE g.session_id = ?1
542                                 ORDER BY g.process_id ASC",
543                            )
544                            .map_err(process_sqlite_error)?;
545                        let rows = stmt
546                            .query_map(params![session_id], |row| {
547                                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
548                            })
549                            .map_err(process_sqlite_error)?;
550                        let mut removed = Vec::new();
551                        for row in rows {
552                            let (process_id, record_json) = row.map_err(process_sqlite_error)?;
553                            let record: ProcessRecord =
554                                serde_json::from_str(&record_json).map_err(process_decode_error)?;
555                            removed.push((process_id, record));
556                        }
557                        removed
558                    };
559
560                    // Wake acknowledgements are process-scoped consumed-event markers.
561                    // Session deletion removes materialized session-addressed deliveries
562                    // through the session store; clearing these rows would re-expose
563                    // already-consumed wakes to surviving grants or future host readers.
564                    let deleted_wake_count = 0;
565                    let revoked_handle_count = tx
566                        .execute(
567                            "DELETE FROM process_handle_grants WHERE session_id = ?1",
568                            params![session_id],
569                        )
570                        .map_err(process_sqlite_error)?;
571                    let mut orphaned_process_ids = Vec::new();
572                    let mut preserved_process_ids = Vec::new();
573                    for (process_id, record) in removed {
574                        if record.is_terminal() {
575                            continue;
576                        }
577                        let remaining_grants: i64 = tx
578                            .query_row(
579                                "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
580                                params![process_id],
581                                |row| row.get(0),
582                            )
583                            .map_err(process_sqlite_error)?;
584                        if remaining_grants == 0 {
585                            orphaned_process_ids.push(process_id);
586                        } else {
587                            preserved_process_ids.push(process_id);
588                        }
589                    }
590                    let wake_targeted = {
591                        let mut stmt = tx
592                            .prepare("SELECT process_id, record_json FROM processes ORDER BY process_id ASC")
593                            .map_err(process_sqlite_error)?;
594                        let rows = stmt
595                            .query_map([], |row| {
596                                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
597                            })
598                            .map_err(process_sqlite_error)?;
599                        let mut records = Vec::new();
600                        for row in rows {
601                            let (process_id, record_json) = row.map_err(process_sqlite_error)?;
602                            let record: ProcessRecord =
603                                serde_json::from_str(&record_json).map_err(process_decode_error)?;
604                            records.push((process_id, record));
605                        }
606                        records
607                    };
608                    for (_process_id, mut record) in wake_targeted {
609                        if record.clear_wake_target_for_session(&session_id) {
610                            Self::save_process_conn(tx, &record)?;
611                        }
612                    }
613                    Ok((
614                        revoked_handle_count,
615                        deleted_wake_count,
616                        orphaned_process_ids,
617                        preserved_process_ids,
618                    ))
619                })()))
620            })
621            .await
622            .map_err(process_sqlite_error)??;
623        orphaned_process_ids.sort();
624        orphaned_process_ids.dedup();
625        preserved_process_ids.sort();
626        preserved_process_ids.dedup();
627        Ok(lash_core::ProcessSessionDeleteReport {
628            session_id: session_id.to_string(),
629            revoked_handle_count,
630            deleted_wake_count,
631            orphaned_process_ids,
632            preserved_process_ids,
633        })
634    }
635
636    async fn append_event(
637        &self,
638        process_id: &str,
639        request: ProcessEventAppendRequest,
640    ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
641        let process_id = process_id.to_string();
642        let (result, appended) = self
643            .conn
644            .write_flow(move |tx| {
645                Ok(tx_outcome((|| {
646                    let mut record =
647                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
648                            lash_core::PluginError::Session(format!(
649                                "unknown process `{process_id}`"
650                            ))
651                        })?;
652                    let replay_lookup = if let Some(replay_key) =
653                        request.replay.as_ref().map(|replay| replay.key.as_str())
654                    {
655                        Self::load_event_by_key_conn(tx, &process_id, replay_key)?
656                    } else {
657                        None
658                    };
659                    let sequence = tx
660                        .query_row(
661                            "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
662                            params![process_id],
663                            |row| row.get::<_, i64>(0),
664                        )
665                        .map_err(process_sqlite_error)? as u64;
666                    let occurred_at_ms = current_epoch_ms();
667                    let prepared = prepare_process_event_append(
668                        &record,
669                        request,
670                        sequence,
671                        replay_lookup,
672                        occurred_at_ms,
673                    )?;
674                    match prepared {
675                        lash_core::ProcessEventAppendPlan::Replay {
676                            event,
677                            repair_status,
678                            wake_delivery,
679                            occurred_at_ms,
680                        } => {
681                            let repaired = if let Some(status) = repair_status {
682                                lash_core::apply_process_status_projection(
683                                    &mut record,
684                                    status,
685                                    occurred_at_ms,
686                                );
687                                Self::save_process_conn(tx, &record)?;
688                                true
689                            } else {
690                                false
691                            };
692                            Ok((
693                                ProcessEventAppendResult {
694                                    event,
695                                    wake_delivery,
696                                },
697                                repaired,
698                            ))
699                        }
700                        lash_core::ProcessEventAppendPlan::Insert {
701                            event,
702                            payload_hash,
703                            status_update,
704                            wake_delivery,
705                            occurred_at_ms,
706                        } => {
707                            tx.execute(
708                                "INSERT INTO process_events (
709                                    process_id, sequence, event_type, payload_hash, idempotency_key,
710                                    occurred_at_ms, event_json
711                                 )
712                                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
713                                params![
714                                    process_id,
715                                    sequence as i64,
716                                    event.event_type.as_str(),
717                                    payload_hash.as_str(),
718                                    event.invocation.replay_key(),
719                                    occurred_at_ms as i64,
720                                    process_encode_json(&event)?,
721                                ],
722                            )
723                            .map_err(process_sqlite_error)?;
724                            if let Some(status) = status_update {
725                                lash_core::apply_process_status_projection(
726                                    &mut record,
727                                    status,
728                                    occurred_at_ms,
729                                );
730                            } else {
731                                record.updated_at_ms = occurred_at_ms;
732                            }
733                            Self::save_process_conn(tx, &record)?;
734                            Ok((
735                                ProcessEventAppendResult {
736                                    event,
737                                    wake_delivery,
738                                },
739                                true,
740                            ))
741                        }
742                    }
743                })()))
744            })
745            .await
746            .map_err(process_sqlite_error)??;
747        if appended {
748            self.notify.notify_waiters();
749        }
750        Ok(result)
751    }
752
753    async fn events_after(
754        &self,
755        process_id: &str,
756        after_sequence: u64,
757    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
758        let process_id = process_id.to_string();
759        self.conn
760            .call(move |conn| {
761                Ok((|| {
762                    if Self::load_process_conn(conn, &process_id)?.is_none() {
763                        return Err(lash_core::PluginError::Session(format!(
764                            "unknown process `{process_id}`"
765                        )));
766                    }
767                    let mut stmt = conn
768                        .prepare(
769                            "SELECT event_json FROM process_events
770                             WHERE process_id = ?1 AND sequence > ?2
771                             ORDER BY sequence ASC",
772                        )
773                        .map_err(process_sqlite_error)?;
774                    let rows = stmt
775                        .query_map(params![process_id, after_sequence as i64], |row| {
776                            row.get::<_, String>(0)
777                        })
778                        .map_err(process_sqlite_error)?;
779                    let mut events = Vec::new();
780                    for row in rows {
781                        events.push(
782                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
783                                .map_err(process_decode_error)?,
784                        );
785                    }
786                    Ok(events)
787                })())
788            })
789            .await
790            .map_err(process_sqlite_error)?
791    }
792
793    async fn count_events_through(
794        &self,
795        process_id: &str,
796        event_type: &str,
797        up_to_sequence: u64,
798    ) -> Result<u64, lash_core::PluginError> {
799        let process_id = process_id.to_string();
800        let event_type = event_type.to_string();
801        self.conn
802            .call(move |conn| {
803                Ok((|| {
804                    if Self::load_process_conn(conn, &process_id)?.is_none() {
805                        return Err(lash_core::PluginError::Session(format!(
806                            "unknown process `{process_id}`"
807                        )));
808                    }
809                    conn.query_row(
810                        "SELECT COUNT(*) FROM process_events
811                         WHERE process_id = ?1 AND event_type = ?2 AND sequence <= ?3",
812                        params![process_id, event_type, up_to_sequence as i64],
813                        |row| row.get::<_, i64>(0),
814                    )
815                    .map(|count| count as u64)
816                    .map_err(process_sqlite_error)
817                })())
818            })
819            .await
820            .map_err(process_sqlite_error)?
821    }
822
823    async fn recent_events(
824        &self,
825        process_id: &str,
826        limit: usize,
827    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
828        let process_id = process_id.to_string();
829        self.conn
830            .call(move |conn| {
831                Ok((|| {
832                    if Self::load_process_conn(conn, &process_id)?.is_none() {
833                        return Err(lash_core::PluginError::Session(format!(
834                            "unknown process `{process_id}`"
835                        )));
836                    }
837                    let mut stmt = conn
838                        .prepare(
839                            "SELECT event_json FROM process_events
840                             WHERE process_id = ?1
841                             ORDER BY sequence DESC
842                             LIMIT ?2",
843                        )
844                        .map_err(process_sqlite_error)?;
845                    let rows = stmt
846                        .query_map(params![process_id, limit as i64], |row| {
847                            row.get::<_, String>(0)
848                        })
849                        .map_err(process_sqlite_error)?;
850                    let mut events: Vec<ProcessEvent> = Vec::new();
851                    for row in rows {
852                        events.push(
853                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
854                                .map_err(process_decode_error)?,
855                        );
856                    }
857                    events.reverse();
858                    Ok(events)
859                })())
860            })
861            .await
862            .map_err(process_sqlite_error)?
863    }
864
865    async fn wake_events_after(
866        &self,
867        process_id: &str,
868        after_sequence: u64,
869    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
870        let acked: std::collections::HashSet<u64> = {
871            let process_id = process_id.to_string();
872            self.conn
873                .call(move |conn| {
874                    Ok(
875                        (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
876                            let mut stmt = conn
877                                .prepare(
878                                    "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
879                                )
880                                .map_err(process_sqlite_error)?;
881                            let rows = stmt
882                                .query_map(params![process_id], |row| row.get::<_, i64>(0))
883                                .map_err(process_sqlite_error)?;
884                            let mut set = std::collections::HashSet::new();
885                            for row in rows {
886                                set.insert(row.map_err(process_sqlite_error)? as u64);
887                            }
888                            Ok(set)
889                        })(),
890                    )
891                })
892                .await
893                .map_err(process_sqlite_error)??
894        };
895        Ok(self
896            .events_after(process_id, after_sequence)
897            .await?
898            .into_iter()
899            .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
900            .collect())
901    }
902
903    async fn wait_event_after(
904        &self,
905        process_id: &str,
906        event_type: &str,
907        after_sequence: u64,
908    ) -> Result<ProcessEvent, lash_core::PluginError> {
909        loop {
910            if let Some(event) = self
911                .events_after(process_id, after_sequence)
912                .await?
913                .into_iter()
914                .find(|event| event.event_type == event_type)
915            {
916                return Ok(event);
917            }
918            tokio::select! {
919                _ = self.notify.notified() => {}
920                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
921            }
922        }
923    }
924
925    async fn await_process(
926        &self,
927        process_id: &str,
928    ) -> Result<ProcessAwaitOutput, lash_core::PluginError> {
929        loop {
930            let record = self.get_process(process_id).await.ok_or_else(|| {
931                lash_core::PluginError::Session(format!("unknown process `{process_id}`"))
932            })?;
933            if let Some(await_output) = record.status.await_output() {
934                return Ok(await_output.clone());
935            }
936            tokio::select! {
937                _ = self.notify.notified() => {}
938                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
939            }
940        }
941    }
942
943    async fn complete_process(
944        &self,
945        process_id: &str,
946        await_output: ProcessAwaitOutput,
947    ) -> Result<ProcessRecord, lash_core::PluginError> {
948        let event_type = match await_output.terminal_state() {
949            lash_core::ProcessTerminalState::Completed => "process.completed",
950            lash_core::ProcessTerminalState::Failed => "process.failed",
951            lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
952        };
953        self.append_event(
954            process_id,
955            ProcessEventAppendRequest::new(
956                event_type,
957                serde_json::json!({ "await_output": await_output }),
958            )
959            .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
960        )
961        .await?;
962        self.get_process(process_id).await.ok_or_else(|| {
963            lash_core::PluginError::Session(format!(
964                "unknown process `{process_id}` after terminal event"
965            ))
966        })
967    }
968
969    async fn set_process_wait(
970        &self,
971        process_id: &str,
972        wait: lash_core::WaitState,
973    ) -> Result<ProcessRecord, lash_core::PluginError> {
974        let process_id = process_id.to_string();
975        self.conn
976            .write_flow(move |tx| {
977                Ok(tx_outcome((|| {
978                    let mut record =
979                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
980                            lash_core::PluginError::Session(format!(
981                                "unknown process `{process_id}`"
982                            ))
983                        })?;
984                    if record.is_terminal() {
985                        return Err(lash_core::PluginError::Session(format!(
986                            "terminal process `{process_id}` cannot enter a wait state"
987                        )));
988                    }
989                    record.wait = Some(wait);
990                    record.updated_at_ms = current_epoch_ms();
991                    Self::save_process_conn(tx, &record)?;
992                    Ok(record)
993                })()))
994            })
995            .await
996            .map_err(process_sqlite_error)?
997    }
998
999    async fn clear_process_wait(
1000        &self,
1001        process_id: &str,
1002    ) -> Result<ProcessRecord, lash_core::PluginError> {
1003        let process_id = process_id.to_string();
1004        self.conn
1005            .write_flow(move |tx| {
1006                Ok(tx_outcome((|| {
1007                    let mut record =
1008                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
1009                            lash_core::PluginError::Session(format!(
1010                                "unknown process `{process_id}`"
1011                            ))
1012                        })?;
1013                    record.wait = None;
1014                    record.updated_at_ms = current_epoch_ms();
1015                    Self::save_process_conn(tx, &record)?;
1016                    Ok(record)
1017                })()))
1018            })
1019            .await
1020            .map_err(process_sqlite_error)?
1021    }
1022
1023    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
1024        let process_id = process_id.to_string();
1025        self.conn
1026            .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
1027            .await
1028            .ok()
1029            .flatten()
1030    }
1031
1032    async fn list_processes(
1033        &self,
1034        filter: &lash_core::ProcessListFilter,
1035    ) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1036        let filter = filter.clone();
1037        self.conn
1038            .call(move |conn| {
1039                Ok((|| {
1040                    let mut stmt = conn
1041                        .prepare(
1042                            "SELECT record_json FROM processes
1043                             ORDER BY process_id ASC",
1044                        )
1045                        .map_err(process_sqlite_error)?;
1046                    let rows = stmt
1047                        .query_map([], |row| row.get::<_, String>(0))
1048                        .map_err(process_sqlite_error)?;
1049                    let mut records = Vec::new();
1050                    for row in rows {
1051                        let record: ProcessRecord =
1052                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
1053                                .map_err(process_decode_error)?;
1054                        if filter.matches_record(&record) {
1055                            records.push(record);
1056                        }
1057                    }
1058                    Ok(records)
1059                })())
1060            })
1061            .await
1062            .map_err(process_sqlite_error)?
1063    }
1064
1065    async fn ack_wake(
1066        &self,
1067        process_id: &str,
1068        sequence: u64,
1069    ) -> Result<(), lash_core::PluginError> {
1070        let process_id = process_id.to_string();
1071        self.conn
1072            .call(move |conn| {
1073                Ok((|| {
1074                    if Self::load_process_conn(conn, &process_id)?.is_none() {
1075                        return Err(lash_core::PluginError::Session(format!(
1076                            "unknown process `{process_id}`"
1077                        )));
1078                    }
1079                    conn.execute(
1080                        "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
1081                        params![process_id, sequence as i64],
1082                    )
1083                    .map_err(process_sqlite_error)?;
1084                    Ok(())
1085                })())
1086            })
1087            .await
1088            .map_err(process_sqlite_error)?
1089    }
1090
1091    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1092        self.conn
1093            .call(move |conn| {
1094                Ok((|| {
1095                    let mut stmt = conn
1096                        .prepare(
1097                            "SELECT record_json FROM processes
1098                             WHERE status = 'running'
1099                             ORDER BY process_id ASC",
1100                        )
1101                        .map_err(process_sqlite_error)?;
1102                    let rows = stmt
1103                        .query_map([], |row| row.get::<_, String>(0))
1104                        .map_err(process_sqlite_error)?;
1105                    let mut records = Vec::new();
1106                    for row in rows {
1107                        let record: ProcessRecord =
1108                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
1109                                .map_err(process_decode_error)?;
1110                        records.push(record);
1111                    }
1112                    Ok(records)
1113                })())
1114            })
1115            .await
1116            .map_err(process_sqlite_error)?
1117    }
1118
1119    async fn claim_process_lease(
1120        &self,
1121        process_id: &str,
1122        owner_id: &str,
1123        lease_ttl_ms: u64,
1124    ) -> Result<ProcessLease, lash_core::PluginError> {
1125        let process_id = process_id.to_string();
1126        let owner_id = owner_id.to_string();
1127        self.conn
1128            .write_flow(move |tx| {
1129                Ok(tx_outcome((|| {
1130                    if Self::load_process_conn(tx, &process_id)?.is_none() {
1131                        return Err(lash_core::PluginError::Session(format!(
1132                            "unknown process `{process_id}`"
1133                        )));
1134                    }
1135                    let now = current_epoch_ms();
1136                    let current = Self::load_process_lease_conn(tx, &process_id)?;
1137                    if let Some(current) = current.as_ref()
1138                        && current.expires_at_epoch_ms > now
1139                        && current.owner_id != owner_id
1140                    {
1141                        return Err(process_lease_conflict(&process_id, current));
1142                    }
1143                    // Read the raw fencing token directly: a completed/abandoned
1144                    // lease nulls the owner/token columns but retains the
1145                    // monotonically-increasing `lease_fencing_token`, so a
1146                    // re-claim never reuses a stale writer's token.
1147                    let fencing_token: u64 = tx
1148                        .query_row(
1149                            "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1150                            params![process_id],
1151                            |row| row.get::<_, i64>(0),
1152                        )
1153                        .optional()
1154                        .map_err(process_sqlite_error)?
1155                        .unwrap_or(0) as u64
1156                        + 1;
1157                    let lease = ProcessLease {
1158                        schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1159                        process_id: process_id.clone(),
1160                        owner_id: owner_id.clone(),
1161                        lease_token: format!(
1162                            "{:x}",
1163                            Sha256::digest(
1164                                format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes()
1165                            )
1166                        ),
1167                        fencing_token,
1168                        claimed_at_epoch_ms: now,
1169                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1170                    };
1171                    tx.execute(
1172                        "INSERT INTO process_leases (
1173                            process_id, lease_owner_id, lease_token, lease_fencing_token,
1174                            lease_claimed_at_ms, lease_expires_at_ms
1175                         )
1176                         VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1177                         ON CONFLICT(process_id) DO UPDATE SET
1178                            lease_owner_id = excluded.lease_owner_id,
1179                            lease_token = excluded.lease_token,
1180                            lease_fencing_token = excluded.lease_fencing_token,
1181                            lease_claimed_at_ms = excluded.lease_claimed_at_ms,
1182                            lease_expires_at_ms = excluded.lease_expires_at_ms",
1183                        params![
1184                            lease.process_id.as_str(),
1185                            lease.owner_id.as_str(),
1186                            lease.lease_token.as_str(),
1187                            lease.fencing_token as i64,
1188                            lease.claimed_at_epoch_ms as i64,
1189                            lease.expires_at_epoch_ms as i64,
1190                        ],
1191                    )
1192                    .map_err(process_sqlite_error)?;
1193                    Ok(lease)
1194                })()))
1195            })
1196            .await
1197            .map_err(process_sqlite_error)?
1198    }
1199
1200    async fn renew_process_lease(
1201        &self,
1202        lease: &ProcessLease,
1203        lease_ttl_ms: u64,
1204    ) -> Result<ProcessLease, lash_core::PluginError> {
1205        let lease = lease.clone();
1206        self.conn
1207            .write_flow(move |tx| {
1208                Ok(tx_outcome((|| {
1209                    let now = current_epoch_ms();
1210                    let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
1211                    if !guard_lease(current.as_ref(), &lease.lease_token, now) {
1212                        return Err(process_lease_expired(&lease.process_id));
1213                    }
1214                    let renewed = ProcessLease {
1215                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1216                        ..lease.clone()
1217                    };
1218                    tx.execute(
1219                        "UPDATE process_leases
1220                         SET lease_expires_at_ms = ?2
1221                         WHERE process_id = ?1 AND lease_token = ?3",
1222                        params![
1223                            renewed.process_id.as_str(),
1224                            renewed.expires_at_epoch_ms as i64,
1225                            renewed.lease_token.as_str(),
1226                        ],
1227                    )
1228                    .map_err(process_sqlite_error)?;
1229                    Ok(renewed)
1230                })()))
1231            })
1232            .await
1233            .map_err(process_sqlite_error)?
1234    }
1235
1236    async fn complete_process_lease(
1237        &self,
1238        completion: &ProcessLeaseCompletion,
1239    ) -> Result<(), lash_core::PluginError> {
1240        let process_id = completion.process_id.clone();
1241        let lease_token = completion.lease_token.clone();
1242        self.conn
1243            .call(move |conn| {
1244                conn.execute(
1245                    "UPDATE process_leases
1246                     SET lease_owner_id = NULL,
1247                         lease_token = NULL,
1248                         lease_claimed_at_ms = 0,
1249                         lease_expires_at_ms = 0
1250                     WHERE process_id = ?1 AND lease_token = ?2",
1251                    params![process_id, lease_token],
1252                )
1253            })
1254            .await
1255            .map_err(process_sqlite_error)?;
1256        Ok(())
1257    }
1258}
1259
1260/// Loud, stable error for a fenced process-lease claim on the `PluginError`
1261/// channel the [`ProcessRegistry`] trait returns.
1262fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> lash_core::PluginError {
1263    lash_core::PluginError::Session(format!(
1264        "process `{process_id}` is already leased by `{}` until {}",
1265        current.owner_id, current.expires_at_epoch_ms
1266    ))
1267}
1268
1269/// Loud, stable error for a superseded or expired process lease.
1270fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1271    lash_core::PluginError::Session(format!(
1272        "process lease for `{process_id}` is missing or expired"
1273    ))
1274}
1275
1276fn process_external_ref_conflict(
1277    process_id: &str,
1278    existing: &ProcessExternalRef,
1279    new: &ProcessExternalRef,
1280) -> lash_core::PluginError {
1281    lash_core::PluginError::Session(format!(
1282        "process `{process_id}` external ref conflict: existing {existing:?}, new {new:?}"
1283    ))
1284}