Skip to main content

lash_sqlite_store/
process_registry.rs

1//! SQLite-backed [`ProcessRegistry`] (`SqliteProcessRegistry`).
2//!
3//! First-party SQLite implementation of the public async process-registry
4//! surface. Every DB body is a *synchronous* rusqlite closure handed
5//! to [`SqliteConnection::call`] (reads) or [`SqliteConnection::write_flow`]
6//! (read-then-write).
7//!
8//! ## Why `write_flow`, not `write`
9//!
10//! The registry's transactional methods produce a [`lash_core::PluginError`],
11//! not a `rusqlite::Error`. `SqliteConnection::write` rolls back only when the
12//! closure returns `Err(rusqlite::Error)`, so a logical `PluginError` (e.g. a
13//! registration-hash conflict) would otherwise *commit* the partial work. Each
14//! such method therefore runs its synchronous body returning
15//! `Result<T, PluginError>` and maps it to a [`TxOutcome`]: `Ok` ⇒
16//! `Commit(Ok(value))`, `Err` ⇒ `Rollback(Err(error))`. That preserves the
17//! prior behaviour of rolling back on every error while still carrying the
18//! `PluginError` back to the caller. The outer `rusqlite::Error` channel only
19//! carries genuine SQLite/connection failures, mapped via `process_sqlite_error`.
20//!
21//! The `*_conn` helpers are synchronous and take a `&rusqlite::Connection` so
22//! they compose inside either closure — including from within a `&Transaction`,
23//! which derefs to `&Connection`.
24
25use super::*;
26
27fn process_status_label(record: &ProcessRecord) -> &'static str {
28    record.status.label()
29}
30
31impl SqliteProcessRegistry {
32    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
33        let conn = SqliteConnection::open(path).await?;
34        ensure_process_schema(&conn).await?;
35        apply_pragmas(&conn, StoreBacking::File).await?;
36        Ok(Self {
37            conn,
38            notify: tokio::sync::Notify::new(),
39        })
40    }
41
42    pub async fn memory() -> tokio_rusqlite::Result<Self> {
43        let conn = SqliteConnection::open_in_memory().await?;
44        ensure_process_schema(&conn).await?;
45        apply_pragmas(&conn, StoreBacking::Memory).await?;
46        Ok(Self {
47            conn,
48            notify: tokio::sync::Notify::new(),
49        })
50    }
51
52    fn load_process_conn(
53        conn: &Connection,
54        process_id: &str,
55    ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
56        let json: Option<String> = conn
57            .query_row(
58                "SELECT record_json FROM processes WHERE process_id = ?1",
59                params![process_id],
60                |row| row.get(0),
61            )
62            .optional()
63            .map_err(process_sqlite_error)?;
64        json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
65            .transpose()
66    }
67
68    fn save_process_conn(
69        conn: &Connection,
70        record: &ProcessRecord,
71    ) -> Result<(), lash_core::PluginError> {
72        conn.execute(
73            "UPDATE processes
74             SET updated_at_ms = ?2, status = ?3, record_json = ?4
75             WHERE process_id = ?1",
76            params![
77                record.id.as_str(),
78                record.updated_at_ms as i64,
79                process_status_label(record),
80                process_encode_json(record)?
81            ],
82        )
83        .map_err(process_sqlite_error)?;
84        Ok(())
85    }
86
87    fn load_event_by_key_conn(
88        conn: &Connection,
89        process_id: &str,
90        replay_key: &str,
91    ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
92        let row: Option<(String, String)> = conn
93            .query_row(
94                "SELECT payload_hash, event_json
95                 FROM process_events
96                 WHERE process_id = ?1 AND idempotency_key = ?2",
97                params![process_id, replay_key],
98                |row| Ok((row.get(0)?, row.get(1)?)),
99            )
100            .optional()
101            .map_err(process_sqlite_error)?;
102        row.map(|(hash, json)| {
103            serde_json::from_str(&json)
104                .map(|event| (hash, event))
105                .map_err(process_decode_error)
106        })
107        .transpose()
108    }
109
110    fn load_process_lease_conn(
111        conn: &Connection,
112        process_id: &str,
113    ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
114        conn.query_row(
115            "SELECT lease_owner_id, lease_token, lease_fencing_token,
116                    lease_claimed_at_ms, lease_expires_at_ms
117             FROM process_leases
118             WHERE process_id = ?1",
119            params![process_id],
120            |row| {
121                let owner_id: Option<String> = row.get(0)?;
122                let lease_token: Option<String> = row.get(1)?;
123                let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
124                    return Ok(None);
125                };
126                Ok(Some(ProcessLease {
127                    schema_version: PROCESS_LEASE_SCHEMA_VERSION,
128                    process_id: process_id.to_string(),
129                    owner_id,
130                    lease_token,
131                    fencing_token: row.get::<_, i64>(2)? as u64,
132                    claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
133                    expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
134                }))
135            },
136        )
137        .optional()
138        .map(|lease| lease.flatten())
139        .map_err(process_sqlite_error)
140    }
141
142    fn list_grants_for_scope_conn(
143        conn: &Connection,
144        session_scope: &SessionScope,
145        live_only: bool,
146    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
147        let session_scope_id = session_scope.id();
148        let status_clause = if live_only {
149            "AND p.status = 'running'"
150        } else {
151            ""
152        };
153        let mut stmt = conn
154            .prepare(&format!(
155                "SELECT g.process_id, g.descriptor_json, p.record_json
156                 FROM process_handle_grants g
157                 JOIN processes p ON p.process_id = g.process_id
158                 WHERE g.scope_id = ?1 {status_clause}
159                 ORDER BY g.process_id ASC"
160            ))
161            .map_err(process_sqlite_error)?;
162        let rows = stmt
163            .query_map(params![session_scope_id.as_str()], |row| {
164                Ok((
165                    row.get::<_, String>(0)?,
166                    row.get::<_, String>(1)?,
167                    row.get::<_, String>(2)?,
168                ))
169            })
170            .map_err(process_sqlite_error)?;
171        let mut entries = Vec::new();
172        for row in rows {
173            let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
174            let descriptor: ProcessHandleDescriptor =
175                serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
176            let record: ProcessRecord =
177                serde_json::from_str(&record_json).map_err(process_decode_error)?;
178            entries.push((
179                ProcessHandleGrant {
180                    session_id: session_scope.session_id.clone(),
181                    process_id,
182                    descriptor,
183                },
184                record,
185            ));
186        }
187        Ok(entries)
188    }
189}
190
191/// Map a `Result<T, PluginError>` produced by a synchronous transaction body to
192/// a [`TxOutcome`]: commit on success, roll back on logical error. Both arms
193/// carry the inner `Result` back so the caller recovers the value or the
194/// `PluginError` after the transaction resolves.
195fn tx_outcome<T>(
196    result: Result<T, lash_core::PluginError>,
197) -> TxOutcome<Result<T, lash_core::PluginError>> {
198    match result {
199        Ok(value) => TxOutcome::Commit(Ok(value)),
200        Err(err) => TxOutcome::Rollback(Err(err)),
201    }
202}
203
204#[async_trait::async_trait]
205impl ProcessRegistry for SqliteProcessRegistry {
206    fn durability_tier(&self) -> DurabilityTier {
207        DurabilityTier::Durable
208    }
209
210    async fn register_process(
211        &self,
212        registration: ProcessRegistration,
213    ) -> Result<ProcessRecord, lash_core::PluginError> {
214        let (registration, registration_hash) = prepare_process_registration(registration)?;
215        let record = self
216            .conn
217            .write_flow(move |tx| {
218                Ok(tx_outcome((|| {
219                    if let Some(existing) = Self::load_process_conn(tx, &registration.id)? {
220                        if existing.registration_hash == registration_hash {
221                            return Ok(existing);
222                        }
223                        return Err(lash_core::PluginError::Session(format!(
224                            "process `{}` registration hash conflict: existing {}, new {}",
225                            registration.id, existing.registration_hash, registration_hash
226                        )));
227                    }
228                    let now = current_epoch_ms();
229                    let record = ProcessRecord::from_prepared_registration(
230                        registration,
231                        registration_hash,
232                        now,
233                    );
234                    let originator_scope_id = record.originator_scope_id();
235                    tx.execute(
236                        "INSERT INTO processes (
237                            process_id, registration_hash, owner_scope_id,
238                            created_at_ms, updated_at_ms, status, record_json
239                         )
240                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
241                        params![
242                            record.id.as_str(),
243                            record.registration_hash.as_str(),
244                            originator_scope_id.as_str(),
245                            record.created_at_ms as i64,
246                            record.updated_at_ms as i64,
247                            process_status_label(&record),
248                            process_encode_json(&record)?,
249                        ],
250                    )
251                    .map_err(process_sqlite_error)?;
252                    Ok(record)
253                })()))
254            })
255            .await
256            .map_err(process_sqlite_error)??;
257        self.notify.notify_waiters();
258        Ok(record)
259    }
260
261    async fn set_external_ref(
262        &self,
263        process_id: &str,
264        external_ref: ProcessExternalRef,
265    ) -> Result<ProcessRecord, lash_core::PluginError> {
266        let process_id = process_id.to_string();
267        let record = self
268            .conn
269            .write_flow(move |tx| {
270                Ok(tx_outcome((|| {
271                    let mut record =
272                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
273                            lash_core::PluginError::Session(format!(
274                                "unknown process `{process_id}`"
275                            ))
276                        })?;
277                    record.external_ref = Some(external_ref);
278                    record.updated_at_ms = current_epoch_ms();
279                    Self::save_process_conn(tx, &record)?;
280                    Ok(record)
281                })()))
282            })
283            .await
284            .map_err(process_sqlite_error)??;
285        self.notify.notify_waiters();
286        Ok(record)
287    }
288
289    async fn grant_handle(
290        &self,
291        session_scope: &SessionScope,
292        process_id: &str,
293        descriptor: ProcessHandleDescriptor,
294    ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
295        let session_scope = session_scope.clone();
296        let process_id = process_id.to_string();
297        self.conn
298            .write_flow(move |tx| {
299                Ok(tx_outcome((|| {
300                    let session_scope_id = session_scope.id();
301                    if Self::load_process_conn(tx, &process_id)?.is_none() {
302                        return Err(lash_core::PluginError::Session(format!(
303                            "unknown process `{process_id}`"
304                        )));
305                    }
306                    tx.execute(
307                        "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
308                         VALUES (?1, ?2, ?3, ?4)
309                         ON CONFLICT(scope_id, process_id) DO UPDATE SET
310                            session_id = excluded.session_id,
311                            descriptor_json = excluded.descriptor_json",
312                        params![
313                            session_scope.session_id.as_str(),
314                            session_scope_id.as_str(),
315                            process_id.as_str(),
316                            process_encode_json(&descriptor)?
317                        ],
318                    )
319                    .map_err(process_sqlite_error)?;
320                    Ok(ProcessHandleGrant {
321                        session_id: session_scope.session_id.clone(),
322                        process_id: process_id.clone(),
323                        descriptor,
324                    })
325                })()))
326            })
327            .await
328            .map_err(process_sqlite_error)?
329    }
330
331    async fn revoke_handle(
332        &self,
333        session_scope: &SessionScope,
334        process_id: &str,
335    ) -> Result<(), lash_core::PluginError> {
336        let session_scope_id = session_scope.id().as_str().to_string();
337        let process_id = process_id.to_string();
338        self.conn
339            .call(move |conn| {
340                conn.execute(
341                    "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
342                    params![session_scope_id, process_id],
343                )
344            })
345            .await
346            .map_err(process_sqlite_error)?;
347        Ok(())
348    }
349
350    async fn transfer_handle_grants(
351        &self,
352        from_scope: &SessionScope,
353        to_scope: &SessionScope,
354        process_ids: &[String],
355    ) -> Result<(), lash_core::PluginError> {
356        let from_scope = from_scope.clone();
357        let to_scope = to_scope.clone();
358        let process_ids = process_ids.to_vec();
359        self.conn
360            .write_flow(move |tx| {
361                Ok(tx_outcome((|| {
362                    let from_scope_id = from_scope.id();
363                    let to_scope_id = to_scope.id();
364                    for process_id in &process_ids {
365                        let descriptor_json: Option<String> = tx
366                            .query_row(
367                                "SELECT descriptor_json
368                                 FROM process_handle_grants
369                                 WHERE scope_id = ?1 AND process_id = ?2",
370                                params![from_scope_id.as_str(), process_id.as_str()],
371                                |row| row.get(0),
372                            )
373                            .optional()
374                            .map_err(process_sqlite_error)?;
375                        let Some(descriptor_json) = descriptor_json else {
376                            return Err(lash_core::PluginError::Session(format!(
377                                "process handle `{process_id}` is not granted to session `{}`",
378                                from_scope.session_id
379                            )));
380                        };
381                        tx.execute(
382                            "DELETE FROM process_handle_grants
383                             WHERE scope_id = ?1 AND process_id = ?2",
384                            params![from_scope_id.as_str(), process_id.as_str()],
385                        )
386                        .map_err(process_sqlite_error)?;
387                        tx.execute(
388                            "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
389                             VALUES (?1, ?2, ?3, ?4)
390                             ON CONFLICT(scope_id, process_id) DO UPDATE SET
391                                session_id = excluded.session_id,
392                                descriptor_json = excluded.descriptor_json",
393                            params![
394                                to_scope.session_id.as_str(),
395                                to_scope_id.as_str(),
396                                process_id.as_str(),
397                                descriptor_json
398                            ],
399                        )
400                        .map_err(process_sqlite_error)?;
401                    }
402                    Ok(())
403                })()))
404            })
405            .await
406            .map_err(process_sqlite_error)?
407    }
408
409    async fn list_handle_grants(
410        &self,
411        session_scope: &SessionScope,
412    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
413        let session_scope = session_scope.clone();
414        self.conn
415            .call(move |conn| {
416                Ok(Self::list_grants_for_scope_conn(
417                    conn,
418                    &session_scope,
419                    false,
420                ))
421            })
422            .await
423            .map_err(process_sqlite_error)?
424    }
425
426    async fn list_live_handle_grants(
427        &self,
428        session_scope: &SessionScope,
429    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
430        let session_scope = session_scope.clone();
431        self.conn
432            .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &session_scope, true)))
433            .await
434            .map_err(process_sqlite_error)?
435    }
436
437    async fn has_handle_grant(
438        &self,
439        session_scope: &SessionScope,
440        process_id: &str,
441    ) -> Result<bool, lash_core::PluginError> {
442        let session_scope_id = session_scope.id().as_str().to_string();
443        let process_id = process_id.to_string();
444        self.conn
445            .call(move |conn| {
446                let exists = conn
447                    .query_row(
448                        "SELECT 1
449                         FROM process_handle_grants g
450                         JOIN processes p ON p.process_id = g.process_id
451                         WHERE g.scope_id = ?1 AND g.process_id = ?2
452                         LIMIT 1",
453                        params![session_scope_id, process_id],
454                        |_| Ok(()),
455                    )
456                    .optional()?
457                    .is_some();
458                Ok(exists)
459            })
460            .await
461            .map_err(process_sqlite_error)
462    }
463
464    async fn handle_grants_for_process(
465        &self,
466        process_id: &str,
467    ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
468        let process_id = process_id.to_string();
469        self.conn
470            .call(move |conn| {
471                Ok((|| {
472                    if Self::load_process_conn(conn, &process_id)?.is_none() {
473                        return Err(lash_core::PluginError::Session(format!(
474                            "unknown process `{process_id}`"
475                        )));
476                    }
477                    let mut stmt = conn
478                        .prepare(
479                            "SELECT session_id, descriptor_json
480                             FROM process_handle_grants
481                             WHERE process_id = ?1
482                             ORDER BY session_id ASC, scope_id ASC",
483                        )
484                        .map_err(process_sqlite_error)?;
485                    let rows = stmt
486                        .query_map(params![process_id], |row| {
487                            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
488                        })
489                        .map_err(process_sqlite_error)?;
490                    let mut grants = Vec::new();
491                    for row in rows {
492                        let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
493                        let descriptor: ProcessHandleDescriptor =
494                            serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
495                        grants.push(ProcessHandleGrant {
496                            session_id,
497                            process_id: process_id.clone(),
498                            descriptor,
499                        });
500                    }
501                    Ok(grants)
502                })())
503            })
504            .await
505            .map_err(process_sqlite_error)?
506    }
507
508    async fn delete_session_process_state(
509        &self,
510        session_id: &str,
511    ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
512        let session_id_owned = session_id.to_string();
513        let (
514            revoked_handle_count,
515            deleted_wake_count,
516            mut orphaned_process_ids,
517            mut preserved_process_ids,
518        ) = self
519            .conn
520            .write_flow(move |tx| {
521                Ok(tx_outcome((|| {
522                    let session_id = session_id_owned;
523                    let removed = {
524                        let mut stmt = tx
525                            .prepare(
526                                "SELECT g.process_id, p.record_json
527                                 FROM process_handle_grants g
528                                 JOIN processes p ON p.process_id = g.process_id
529                                 WHERE g.session_id = ?1
530                                 ORDER BY g.process_id ASC",
531                            )
532                            .map_err(process_sqlite_error)?;
533                        let rows = stmt
534                            .query_map(params![session_id], |row| {
535                                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
536                            })
537                            .map_err(process_sqlite_error)?;
538                        let mut removed = Vec::new();
539                        for row in rows {
540                            let (process_id, record_json) = row.map_err(process_sqlite_error)?;
541                            let record: ProcessRecord =
542                                serde_json::from_str(&record_json).map_err(process_decode_error)?;
543                            removed.push((process_id, record));
544                        }
545                        removed
546                    };
547
548                    // Wake acknowledgements are process-scoped consumed-event markers.
549                    // Session deletion removes materialized session-addressed deliveries
550                    // through the session store; clearing these rows would re-expose
551                    // already-consumed wakes to surviving grants or future host readers.
552                    let deleted_wake_count = 0;
553                    let revoked_handle_count = tx
554                        .execute(
555                            "DELETE FROM process_handle_grants WHERE session_id = ?1",
556                            params![session_id],
557                        )
558                        .map_err(process_sqlite_error)?;
559                    let mut orphaned_process_ids = Vec::new();
560                    let mut preserved_process_ids = Vec::new();
561                    for (process_id, record) in removed {
562                        if record.is_terminal() {
563                            continue;
564                        }
565                        let remaining_grants: i64 = tx
566                            .query_row(
567                                "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
568                                params![process_id],
569                                |row| row.get(0),
570                            )
571                            .map_err(process_sqlite_error)?;
572                        if remaining_grants == 0 {
573                            orphaned_process_ids.push(process_id);
574                        } else {
575                            preserved_process_ids.push(process_id);
576                        }
577                    }
578                    let wake_targeted = {
579                        let mut stmt = tx
580                            .prepare("SELECT process_id, record_json FROM processes ORDER BY process_id ASC")
581                            .map_err(process_sqlite_error)?;
582                        let rows = stmt
583                            .query_map([], |row| {
584                                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
585                            })
586                            .map_err(process_sqlite_error)?;
587                        let mut records = Vec::new();
588                        for row in rows {
589                            let (process_id, record_json) = row.map_err(process_sqlite_error)?;
590                            let record: ProcessRecord =
591                                serde_json::from_str(&record_json).map_err(process_decode_error)?;
592                            records.push((process_id, record));
593                        }
594                        records
595                    };
596                    for (_process_id, mut record) in wake_targeted {
597                        if record.clear_wake_target_for_session(&session_id) {
598                            Self::save_process_conn(tx, &record)?;
599                        }
600                    }
601                    Ok((
602                        revoked_handle_count,
603                        deleted_wake_count,
604                        orphaned_process_ids,
605                        preserved_process_ids,
606                    ))
607                })()))
608            })
609            .await
610            .map_err(process_sqlite_error)??;
611        orphaned_process_ids.sort();
612        orphaned_process_ids.dedup();
613        preserved_process_ids.sort();
614        preserved_process_ids.dedup();
615        Ok(lash_core::ProcessSessionDeleteReport {
616            session_id: session_id.to_string(),
617            revoked_handle_count,
618            deleted_wake_count,
619            orphaned_process_ids,
620            preserved_process_ids,
621        })
622    }
623
624    async fn append_event(
625        &self,
626        process_id: &str,
627        request: ProcessEventAppendRequest,
628    ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
629        let process_id = process_id.to_string();
630        let (result, appended) = self
631            .conn
632            .write_flow(move |tx| {
633                Ok(tx_outcome((|| {
634                    let mut record =
635                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
636                            lash_core::PluginError::Session(format!(
637                                "unknown process `{process_id}`"
638                            ))
639                        })?;
640                    let replay_lookup = if let Some(replay_key) =
641                        request.replay.as_ref().map(|replay| replay.key.as_str())
642                    {
643                        Self::load_event_by_key_conn(tx, &process_id, replay_key)?
644                    } else {
645                        None
646                    };
647                    let sequence = tx
648                        .query_row(
649                            "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
650                            params![process_id],
651                            |row| row.get::<_, i64>(0),
652                        )
653                        .map_err(process_sqlite_error)? as u64;
654                    let occurred_at_ms = current_epoch_ms();
655                    let prepared = prepare_process_event_append(
656                        &record,
657                        request,
658                        sequence,
659                        replay_lookup,
660                        occurred_at_ms,
661                    )?;
662                    match prepared {
663                        lash_core::ProcessEventAppendPlan::Replay {
664                            event,
665                            repair_status,
666                            wake_delivery,
667                            occurred_at_ms,
668                        } => {
669                            let repaired = if let Some(status) = repair_status {
670                                lash_core::apply_process_status_projection(
671                                    &mut record,
672                                    status,
673                                    occurred_at_ms,
674                                );
675                                Self::save_process_conn(tx, &record)?;
676                                true
677                            } else {
678                                false
679                            };
680                            Ok((
681                                ProcessEventAppendResult {
682                                    event,
683                                    wake_delivery,
684                                },
685                                repaired,
686                            ))
687                        }
688                        lash_core::ProcessEventAppendPlan::Insert {
689                            event,
690                            payload_hash,
691                            status_update,
692                            wake_delivery,
693                            occurred_at_ms,
694                        } => {
695                            tx.execute(
696                                "INSERT INTO process_events (
697                                    process_id, sequence, event_type, payload_hash, idempotency_key,
698                                    occurred_at_ms, event_json
699                                 )
700                                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
701                                params![
702                                    process_id,
703                                    sequence as i64,
704                                    event.event_type.as_str(),
705                                    payload_hash.as_str(),
706                                    event.invocation.replay_key(),
707                                    occurred_at_ms as i64,
708                                    process_encode_json(&event)?,
709                                ],
710                            )
711                            .map_err(process_sqlite_error)?;
712                            if let Some(status) = status_update {
713                                lash_core::apply_process_status_projection(
714                                    &mut record,
715                                    status,
716                                    occurred_at_ms,
717                                );
718                            } else {
719                                record.updated_at_ms = occurred_at_ms;
720                            }
721                            Self::save_process_conn(tx, &record)?;
722                            Ok((
723                                ProcessEventAppendResult {
724                                    event,
725                                    wake_delivery,
726                                },
727                                true,
728                            ))
729                        }
730                    }
731                })()))
732            })
733            .await
734            .map_err(process_sqlite_error)??;
735        if appended {
736            self.notify.notify_waiters();
737        }
738        Ok(result)
739    }
740
741    async fn events_after(
742        &self,
743        process_id: &str,
744        after_sequence: u64,
745    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
746        let process_id = process_id.to_string();
747        self.conn
748            .call(move |conn| {
749                Ok((|| {
750                    if Self::load_process_conn(conn, &process_id)?.is_none() {
751                        return Err(lash_core::PluginError::Session(format!(
752                            "unknown process `{process_id}`"
753                        )));
754                    }
755                    let mut stmt = conn
756                        .prepare(
757                            "SELECT event_json FROM process_events
758                             WHERE process_id = ?1 AND sequence > ?2
759                             ORDER BY sequence ASC",
760                        )
761                        .map_err(process_sqlite_error)?;
762                    let rows = stmt
763                        .query_map(params![process_id, after_sequence as i64], |row| {
764                            row.get::<_, String>(0)
765                        })
766                        .map_err(process_sqlite_error)?;
767                    let mut events = Vec::new();
768                    for row in rows {
769                        events.push(
770                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
771                                .map_err(process_decode_error)?,
772                        );
773                    }
774                    Ok(events)
775                })())
776            })
777            .await
778            .map_err(process_sqlite_error)?
779    }
780
781    async fn count_events_through(
782        &self,
783        process_id: &str,
784        event_type: &str,
785        up_to_sequence: u64,
786    ) -> Result<u64, lash_core::PluginError> {
787        let process_id = process_id.to_string();
788        let event_type = event_type.to_string();
789        self.conn
790            .call(move |conn| {
791                Ok((|| {
792                    if Self::load_process_conn(conn, &process_id)?.is_none() {
793                        return Err(lash_core::PluginError::Session(format!(
794                            "unknown process `{process_id}`"
795                        )));
796                    }
797                    conn.query_row(
798                        "SELECT COUNT(*) FROM process_events
799                         WHERE process_id = ?1 AND event_type = ?2 AND sequence <= ?3",
800                        params![process_id, event_type, up_to_sequence as i64],
801                        |row| row.get::<_, i64>(0),
802                    )
803                    .map(|count| count as u64)
804                    .map_err(process_sqlite_error)
805                })())
806            })
807            .await
808            .map_err(process_sqlite_error)?
809    }
810
811    async fn recent_events(
812        &self,
813        process_id: &str,
814        limit: usize,
815    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
816        let process_id = process_id.to_string();
817        self.conn
818            .call(move |conn| {
819                Ok((|| {
820                    if Self::load_process_conn(conn, &process_id)?.is_none() {
821                        return Err(lash_core::PluginError::Session(format!(
822                            "unknown process `{process_id}`"
823                        )));
824                    }
825                    let mut stmt = conn
826                        .prepare(
827                            "SELECT event_json FROM process_events
828                             WHERE process_id = ?1
829                             ORDER BY sequence DESC
830                             LIMIT ?2",
831                        )
832                        .map_err(process_sqlite_error)?;
833                    let rows = stmt
834                        .query_map(params![process_id, limit as i64], |row| {
835                            row.get::<_, String>(0)
836                        })
837                        .map_err(process_sqlite_error)?;
838                    let mut events: Vec<ProcessEvent> = Vec::new();
839                    for row in rows {
840                        events.push(
841                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
842                                .map_err(process_decode_error)?,
843                        );
844                    }
845                    events.reverse();
846                    Ok(events)
847                })())
848            })
849            .await
850            .map_err(process_sqlite_error)?
851    }
852
853    async fn wake_events_after(
854        &self,
855        process_id: &str,
856        after_sequence: u64,
857    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
858        let acked: std::collections::HashSet<u64> = {
859            let process_id = process_id.to_string();
860            self.conn
861                .call(move |conn| {
862                    Ok(
863                        (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
864                            let mut stmt = conn
865                                .prepare(
866                                    "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
867                                )
868                                .map_err(process_sqlite_error)?;
869                            let rows = stmt
870                                .query_map(params![process_id], |row| row.get::<_, i64>(0))
871                                .map_err(process_sqlite_error)?;
872                            let mut set = std::collections::HashSet::new();
873                            for row in rows {
874                                set.insert(row.map_err(process_sqlite_error)? as u64);
875                            }
876                            Ok(set)
877                        })(),
878                    )
879                })
880                .await
881                .map_err(process_sqlite_error)??
882        };
883        Ok(self
884            .events_after(process_id, after_sequence)
885            .await?
886            .into_iter()
887            .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
888            .collect())
889    }
890
891    async fn wait_event_after(
892        &self,
893        process_id: &str,
894        event_type: &str,
895        after_sequence: u64,
896    ) -> Result<ProcessEvent, lash_core::PluginError> {
897        loop {
898            if let Some(event) = self
899                .events_after(process_id, after_sequence)
900                .await?
901                .into_iter()
902                .find(|event| event.event_type == event_type)
903            {
904                return Ok(event);
905            }
906            tokio::select! {
907                _ = self.notify.notified() => {}
908                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
909            }
910        }
911    }
912
913    async fn await_process(
914        &self,
915        process_id: &str,
916    ) -> Result<ProcessAwaitOutput, lash_core::PluginError> {
917        loop {
918            let record = self.get_process(process_id).await.ok_or_else(|| {
919                lash_core::PluginError::Session(format!("unknown process `{process_id}`"))
920            })?;
921            if let Some(await_output) = record.status.await_output() {
922                return Ok(await_output.clone());
923            }
924            tokio::select! {
925                _ = self.notify.notified() => {}
926                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
927            }
928        }
929    }
930
931    async fn complete_process(
932        &self,
933        process_id: &str,
934        await_output: ProcessAwaitOutput,
935    ) -> Result<ProcessRecord, lash_core::PluginError> {
936        let event_type = match await_output.terminal_state() {
937            lash_core::ProcessTerminalState::Completed => "process.completed",
938            lash_core::ProcessTerminalState::Failed => "process.failed",
939            lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
940        };
941        self.append_event(
942            process_id,
943            ProcessEventAppendRequest::new(
944                event_type,
945                serde_json::json!({ "await_output": await_output }),
946            )
947            .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
948        )
949        .await?;
950        self.get_process(process_id).await.ok_or_else(|| {
951            lash_core::PluginError::Session(format!(
952                "unknown process `{process_id}` after terminal event"
953            ))
954        })
955    }
956
957    async fn set_process_wait(
958        &self,
959        process_id: &str,
960        wait: lash_core::WaitState,
961    ) -> Result<ProcessRecord, lash_core::PluginError> {
962        let process_id = process_id.to_string();
963        self.conn
964            .write_flow(move |tx| {
965                Ok(tx_outcome((|| {
966                    let mut record =
967                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
968                            lash_core::PluginError::Session(format!(
969                                "unknown process `{process_id}`"
970                            ))
971                        })?;
972                    if record.is_terminal() {
973                        return Err(lash_core::PluginError::Session(format!(
974                            "terminal process `{process_id}` cannot enter a wait state"
975                        )));
976                    }
977                    record.wait = Some(wait);
978                    record.updated_at_ms = current_epoch_ms();
979                    Self::save_process_conn(tx, &record)?;
980                    Ok(record)
981                })()))
982            })
983            .await
984            .map_err(process_sqlite_error)?
985    }
986
987    async fn clear_process_wait(
988        &self,
989        process_id: &str,
990    ) -> Result<ProcessRecord, lash_core::PluginError> {
991        let process_id = process_id.to_string();
992        self.conn
993            .write_flow(move |tx| {
994                Ok(tx_outcome((|| {
995                    let mut record =
996                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
997                            lash_core::PluginError::Session(format!(
998                                "unknown process `{process_id}`"
999                            ))
1000                        })?;
1001                    record.wait = None;
1002                    record.updated_at_ms = current_epoch_ms();
1003                    Self::save_process_conn(tx, &record)?;
1004                    Ok(record)
1005                })()))
1006            })
1007            .await
1008            .map_err(process_sqlite_error)?
1009    }
1010
1011    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
1012        let process_id = process_id.to_string();
1013        self.conn
1014            .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
1015            .await
1016            .ok()
1017            .flatten()
1018    }
1019
1020    async fn list_processes(
1021        &self,
1022        filter: &lash_core::ProcessListFilter,
1023    ) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1024        let filter = filter.clone();
1025        self.conn
1026            .call(move |conn| {
1027                Ok((|| {
1028                    let mut stmt = conn
1029                        .prepare(
1030                            "SELECT record_json FROM processes
1031                             ORDER BY process_id ASC",
1032                        )
1033                        .map_err(process_sqlite_error)?;
1034                    let rows = stmt
1035                        .query_map([], |row| row.get::<_, String>(0))
1036                        .map_err(process_sqlite_error)?;
1037                    let mut records = Vec::new();
1038                    for row in rows {
1039                        let record: ProcessRecord =
1040                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
1041                                .map_err(process_decode_error)?;
1042                        if filter.matches_record(&record) {
1043                            records.push(record);
1044                        }
1045                    }
1046                    Ok(records)
1047                })())
1048            })
1049            .await
1050            .map_err(process_sqlite_error)?
1051    }
1052
1053    async fn ack_wake(
1054        &self,
1055        process_id: &str,
1056        sequence: u64,
1057    ) -> Result<(), lash_core::PluginError> {
1058        let process_id = process_id.to_string();
1059        self.conn
1060            .call(move |conn| {
1061                Ok((|| {
1062                    if Self::load_process_conn(conn, &process_id)?.is_none() {
1063                        return Err(lash_core::PluginError::Session(format!(
1064                            "unknown process `{process_id}`"
1065                        )));
1066                    }
1067                    conn.execute(
1068                        "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
1069                        params![process_id, sequence as i64],
1070                    )
1071                    .map_err(process_sqlite_error)?;
1072                    Ok(())
1073                })())
1074            })
1075            .await
1076            .map_err(process_sqlite_error)?
1077    }
1078
1079    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1080        self.conn
1081            .call(move |conn| {
1082                Ok((|| {
1083                    let mut stmt = conn
1084                        .prepare(
1085                            "SELECT record_json FROM processes
1086                             WHERE status = 'running'
1087                             ORDER BY process_id ASC",
1088                        )
1089                        .map_err(process_sqlite_error)?;
1090                    let rows = stmt
1091                        .query_map([], |row| row.get::<_, String>(0))
1092                        .map_err(process_sqlite_error)?;
1093                    let mut records = Vec::new();
1094                    for row in rows {
1095                        let record: ProcessRecord =
1096                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
1097                                .map_err(process_decode_error)?;
1098                        records.push(record);
1099                    }
1100                    Ok(records)
1101                })())
1102            })
1103            .await
1104            .map_err(process_sqlite_error)?
1105    }
1106
1107    async fn claim_process_lease(
1108        &self,
1109        process_id: &str,
1110        owner_id: &str,
1111        lease_ttl_ms: u64,
1112    ) -> Result<ProcessLease, lash_core::PluginError> {
1113        let process_id = process_id.to_string();
1114        let owner_id = owner_id.to_string();
1115        self.conn
1116            .write_flow(move |tx| {
1117                Ok(tx_outcome((|| {
1118                    if Self::load_process_conn(tx, &process_id)?.is_none() {
1119                        return Err(lash_core::PluginError::Session(format!(
1120                            "unknown process `{process_id}`"
1121                        )));
1122                    }
1123                    let now = current_epoch_ms();
1124                    let current = Self::load_process_lease_conn(tx, &process_id)?;
1125                    if let Some(current) = current.as_ref()
1126                        && current.expires_at_epoch_ms > now
1127                        && current.owner_id != owner_id
1128                    {
1129                        return Err(process_lease_conflict(&process_id, current));
1130                    }
1131                    // Read the raw fencing token directly: a completed/abandoned
1132                    // lease nulls the owner/token columns but retains the
1133                    // monotonically-increasing `lease_fencing_token`, so a
1134                    // re-claim never reuses a stale writer's token.
1135                    let fencing_token: u64 = tx
1136                        .query_row(
1137                            "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1138                            params![process_id],
1139                            |row| row.get::<_, i64>(0),
1140                        )
1141                        .optional()
1142                        .map_err(process_sqlite_error)?
1143                        .unwrap_or(0) as u64
1144                        + 1;
1145                    let lease = ProcessLease {
1146                        schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1147                        process_id: process_id.clone(),
1148                        owner_id: owner_id.clone(),
1149                        lease_token: format!(
1150                            "{:x}",
1151                            Sha256::digest(
1152                                format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes()
1153                            )
1154                        ),
1155                        fencing_token,
1156                        claimed_at_epoch_ms: now,
1157                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1158                    };
1159                    tx.execute(
1160                        "INSERT INTO process_leases (
1161                            process_id, lease_owner_id, lease_token, lease_fencing_token,
1162                            lease_claimed_at_ms, lease_expires_at_ms
1163                         )
1164                         VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1165                         ON CONFLICT(process_id) DO UPDATE SET
1166                            lease_owner_id = excluded.lease_owner_id,
1167                            lease_token = excluded.lease_token,
1168                            lease_fencing_token = excluded.lease_fencing_token,
1169                            lease_claimed_at_ms = excluded.lease_claimed_at_ms,
1170                            lease_expires_at_ms = excluded.lease_expires_at_ms",
1171                        params![
1172                            lease.process_id.as_str(),
1173                            lease.owner_id.as_str(),
1174                            lease.lease_token.as_str(),
1175                            lease.fencing_token as i64,
1176                            lease.claimed_at_epoch_ms as i64,
1177                            lease.expires_at_epoch_ms as i64,
1178                        ],
1179                    )
1180                    .map_err(process_sqlite_error)?;
1181                    Ok(lease)
1182                })()))
1183            })
1184            .await
1185            .map_err(process_sqlite_error)?
1186    }
1187
1188    async fn renew_process_lease(
1189        &self,
1190        lease: &ProcessLease,
1191        lease_ttl_ms: u64,
1192    ) -> Result<ProcessLease, lash_core::PluginError> {
1193        let lease = lease.clone();
1194        self.conn
1195            .write_flow(move |tx| {
1196                Ok(tx_outcome((|| {
1197                    let now = current_epoch_ms();
1198                    let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
1199                    if !guard_lease(current.as_ref(), &lease.lease_token, now) {
1200                        return Err(process_lease_expired(&lease.process_id));
1201                    }
1202                    let renewed = ProcessLease {
1203                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1204                        ..lease.clone()
1205                    };
1206                    tx.execute(
1207                        "UPDATE process_leases
1208                         SET lease_expires_at_ms = ?2
1209                         WHERE process_id = ?1 AND lease_token = ?3",
1210                        params![
1211                            renewed.process_id.as_str(),
1212                            renewed.expires_at_epoch_ms as i64,
1213                            renewed.lease_token.as_str(),
1214                        ],
1215                    )
1216                    .map_err(process_sqlite_error)?;
1217                    Ok(renewed)
1218                })()))
1219            })
1220            .await
1221            .map_err(process_sqlite_error)?
1222    }
1223
1224    async fn complete_process_lease(
1225        &self,
1226        completion: &ProcessLeaseCompletion,
1227    ) -> Result<(), lash_core::PluginError> {
1228        let process_id = completion.process_id.clone();
1229        let lease_token = completion.lease_token.clone();
1230        self.conn
1231            .call(move |conn| {
1232                conn.execute(
1233                    "UPDATE process_leases
1234                     SET lease_owner_id = NULL,
1235                         lease_token = NULL,
1236                         lease_claimed_at_ms = 0,
1237                         lease_expires_at_ms = 0
1238                     WHERE process_id = ?1 AND lease_token = ?2",
1239                    params![process_id, lease_token],
1240                )
1241            })
1242            .await
1243            .map_err(process_sqlite_error)?;
1244        Ok(())
1245    }
1246}
1247
1248/// Loud, stable error for a fenced process-lease claim on the `PluginError`
1249/// channel the [`ProcessRegistry`] trait returns.
1250fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> lash_core::PluginError {
1251    lash_core::PluginError::Session(format!(
1252        "process `{process_id}` is already leased by `{}` until {}",
1253        current.owner_id, current.expires_at_epoch_ms
1254    ))
1255}
1256
1257/// Loud, stable error for a superseded or expired process lease.
1258fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1259    lash_core::PluginError::Session(format!(
1260        "process lease for `{process_id}` is missing or expired"
1261    ))
1262}