Skip to main content

lash_sqlite_store/
process_registry.rs

1//! SQLite-backed [`ProcessRegistry`] (`SqliteProcessRegistry`).
2//!
3//! First-party SQLite implementation of the public async process-registry
4//! surface. Every DB body is a *synchronous* rusqlite closure handed
5//! to [`SqliteConnection::call`] (reads) or [`SqliteConnection::write_flow`]
6//! (read-then-write).
7//!
8//! ## Why `write_flow`, not `write`
9//!
10//! The registry's transactional methods produce a [`lash_core::PluginError`],
11//! not a `rusqlite::Error`. `SqliteConnection::write` rolls back only when the
12//! closure returns `Err(rusqlite::Error)`, so a logical `PluginError` (e.g. a
13//! registration-hash conflict) would otherwise *commit* the partial work. Each
14//! such method therefore runs its synchronous body returning
15//! `Result<T, PluginError>` and maps it to a [`TxOutcome`]: `Ok` ⇒
16//! `Commit(Ok(value))`, `Err` ⇒ `Rollback(Err(error))`. That preserves the
17//! prior behaviour of rolling back on every error while still carrying the
18//! `PluginError` back to the caller. The outer `rusqlite::Error` channel only
19//! carries genuine SQLite/connection failures, mapped via `process_sqlite_error`.
20//!
21//! The `*_conn` helpers are synchronous and take a `&rusqlite::Connection` so
22//! they compose inside either closure — including from within a `&Transaction`,
23//! which derefs to `&Connection`.
24
25use super::*;
26
27fn process_status_label(record: &ProcessRecord) -> &'static str {
28    record.status.label()
29}
30
31impl SqliteProcessRegistry {
32    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
33        let conn = SqliteConnection::open(path).await?;
34        ensure_process_schema(&conn).await?;
35        apply_pragmas(&conn, StoreBacking::File).await?;
36        Ok(Self {
37            conn,
38            notify: tokio::sync::Notify::new(),
39        })
40    }
41
42    pub async fn memory() -> tokio_rusqlite::Result<Self> {
43        let conn = SqliteConnection::open_in_memory().await?;
44        ensure_process_schema(&conn).await?;
45        apply_pragmas(&conn, StoreBacking::Memory).await?;
46        Ok(Self {
47            conn,
48            notify: tokio::sync::Notify::new(),
49        })
50    }
51
52    fn load_process_conn(
53        conn: &Connection,
54        process_id: &str,
55    ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
56        let json: Option<String> = conn
57            .query_row(
58                "SELECT record_json FROM processes WHERE process_id = ?1",
59                params![process_id],
60                |row| row.get(0),
61            )
62            .optional()
63            .map_err(process_sqlite_error)?;
64        json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
65            .transpose()
66    }
67
68    fn save_process_conn(
69        conn: &Connection,
70        record: &ProcessRecord,
71    ) -> Result<(), lash_core::PluginError> {
72        conn.execute(
73            "UPDATE processes
74             SET updated_at_ms = ?2, status = ?3, record_json = ?4
75             WHERE process_id = ?1",
76            params![
77                record.id.as_str(),
78                record.updated_at_ms as i64,
79                process_status_label(record),
80                process_encode_json(record)?
81            ],
82        )
83        .map_err(process_sqlite_error)?;
84        Ok(())
85    }
86
87    fn load_event_by_key_conn(
88        conn: &Connection,
89        process_id: &str,
90        replay_key: &str,
91    ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
92        let row: Option<(String, String)> = conn
93            .query_row(
94                "SELECT payload_hash, event_json
95                 FROM process_events
96                 WHERE process_id = ?1 AND idempotency_key = ?2",
97                params![process_id, replay_key],
98                |row| Ok((row.get(0)?, row.get(1)?)),
99            )
100            .optional()
101            .map_err(process_sqlite_error)?;
102        row.map(|(hash, json)| {
103            serde_json::from_str(&json)
104                .map(|event| (hash, event))
105                .map_err(process_decode_error)
106        })
107        .transpose()
108    }
109
110    fn load_process_lease_conn(
111        conn: &Connection,
112        process_id: &str,
113    ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
114        conn.query_row(
115            "SELECT lease_owner_id, lease_token, lease_fencing_token,
116                    lease_claimed_at_ms, lease_expires_at_ms
117             FROM process_leases
118             WHERE process_id = ?1",
119            params![process_id],
120            |row| {
121                let owner_id: Option<String> = row.get(0)?;
122                let lease_token: Option<String> = row.get(1)?;
123                let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
124                    return Ok(None);
125                };
126                Ok(Some(ProcessLease {
127                    schema_version: PROCESS_LEASE_SCHEMA_VERSION,
128                    process_id: process_id.to_string(),
129                    owner_id,
130                    lease_token,
131                    fencing_token: row.get::<_, i64>(2)? as u64,
132                    claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
133                    expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
134                }))
135            },
136        )
137        .optional()
138        .map(|lease| lease.flatten())
139        .map_err(process_sqlite_error)
140    }
141
142    fn list_grants_for_scope_conn(
143        conn: &Connection,
144        session_scope: &SessionScope,
145        live_only: bool,
146    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
147        let session_scope_id = session_scope.id();
148        let status_clause = if live_only {
149            "AND p.status = 'running'"
150        } else {
151            ""
152        };
153        let mut stmt = conn
154            .prepare(&format!(
155                "SELECT g.process_id, g.descriptor_json, p.record_json
156                 FROM process_handle_grants g
157                 JOIN processes p ON p.process_id = g.process_id
158                 WHERE g.scope_id = ?1 {status_clause}
159                 ORDER BY g.process_id ASC"
160            ))
161            .map_err(process_sqlite_error)?;
162        let rows = stmt
163            .query_map(params![session_scope_id.as_str()], |row| {
164                Ok((
165                    row.get::<_, String>(0)?,
166                    row.get::<_, String>(1)?,
167                    row.get::<_, String>(2)?,
168                ))
169            })
170            .map_err(process_sqlite_error)?;
171        let mut entries = Vec::new();
172        for row in rows {
173            let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
174            let descriptor: ProcessHandleDescriptor =
175                serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
176            let record: ProcessRecord =
177                serde_json::from_str(&record_json).map_err(process_decode_error)?;
178            entries.push((
179                ProcessHandleGrant {
180                    session_id: session_scope.session_id.clone(),
181                    process_id,
182                    descriptor,
183                },
184                record,
185            ));
186        }
187        Ok(entries)
188    }
189}
190
191/// Map a `Result<T, PluginError>` produced by a synchronous transaction body to
192/// a [`TxOutcome`]: commit on success, roll back on logical error. Both arms
193/// carry the inner `Result` back so the caller recovers the value or the
194/// `PluginError` after the transaction resolves.
195fn tx_outcome<T>(
196    result: Result<T, lash_core::PluginError>,
197) -> TxOutcome<Result<T, lash_core::PluginError>> {
198    match result {
199        Ok(value) => TxOutcome::Commit(Ok(value)),
200        Err(err) => TxOutcome::Rollback(Err(err)),
201    }
202}
203
204#[async_trait::async_trait]
205impl ProcessRegistry for SqliteProcessRegistry {
206    fn durability_tier(&self) -> DurabilityTier {
207        DurabilityTier::Durable
208    }
209
210    async fn register_process(
211        &self,
212        registration: ProcessRegistration,
213    ) -> Result<ProcessRecord, lash_core::PluginError> {
214        let (registration, registration_hash) = prepare_process_registration(registration)?;
215        let record = self
216            .conn
217            .write_flow(move |tx| {
218                Ok(tx_outcome((|| {
219                    if let Some(existing) = Self::load_process_conn(tx, &registration.id)? {
220                        if existing.registration_hash == registration_hash {
221                            return Ok(existing);
222                        }
223                        return Err(lash_core::PluginError::Session(format!(
224                            "process `{}` registration hash conflict: existing {}, new {}",
225                            registration.id, existing.registration_hash, registration_hash
226                        )));
227                    }
228                    let now = current_epoch_ms();
229                    let record = ProcessRecord::from_prepared_registration(
230                        registration,
231                        registration_hash,
232                        now,
233                    );
234                    let originator_scope_id = record.originator_scope_id();
235                    tx.execute(
236                        "INSERT INTO processes (
237                            process_id, registration_hash, owner_scope_id,
238                            created_at_ms, updated_at_ms, status, record_json
239                         )
240                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
241                        params![
242                            record.id.as_str(),
243                            record.registration_hash.as_str(),
244                            originator_scope_id.as_str(),
245                            record.created_at_ms as i64,
246                            record.updated_at_ms as i64,
247                            process_status_label(&record),
248                            process_encode_json(&record)?,
249                        ],
250                    )
251                    .map_err(process_sqlite_error)?;
252                    Ok(record)
253                })()))
254            })
255            .await
256            .map_err(process_sqlite_error)??;
257        self.notify.notify_waiters();
258        Ok(record)
259    }
260
261    async fn set_external_ref(
262        &self,
263        process_id: &str,
264        external_ref: ProcessExternalRef,
265    ) -> Result<ProcessRecord, lash_core::PluginError> {
266        let process_id = process_id.to_string();
267        let record = self
268            .conn
269            .write_flow(move |tx| {
270                Ok(tx_outcome((|| {
271                    let mut record =
272                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
273                            lash_core::PluginError::Session(format!(
274                                "unknown process `{process_id}`"
275                            ))
276                        })?;
277                    record.external_ref = Some(external_ref);
278                    record.updated_at_ms = current_epoch_ms();
279                    Self::save_process_conn(tx, &record)?;
280                    Ok(record)
281                })()))
282            })
283            .await
284            .map_err(process_sqlite_error)??;
285        self.notify.notify_waiters();
286        Ok(record)
287    }
288
289    async fn grant_handle(
290        &self,
291        session_scope: &SessionScope,
292        process_id: &str,
293        descriptor: ProcessHandleDescriptor,
294    ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
295        let session_scope = session_scope.clone();
296        let process_id = process_id.to_string();
297        self.conn
298            .write_flow(move |tx| {
299                Ok(tx_outcome((|| {
300                    let session_scope_id = session_scope.id();
301                    if Self::load_process_conn(tx, &process_id)?.is_none() {
302                        return Err(lash_core::PluginError::Session(format!(
303                            "unknown process `{process_id}`"
304                        )));
305                    }
306                    tx.execute(
307                        "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
308                         VALUES (?1, ?2, ?3, ?4)
309                         ON CONFLICT(scope_id, process_id) DO UPDATE SET
310                            session_id = excluded.session_id,
311                            descriptor_json = excluded.descriptor_json",
312                        params![
313                            session_scope.session_id.as_str(),
314                            session_scope_id.as_str(),
315                            process_id.as_str(),
316                            process_encode_json(&descriptor)?
317                        ],
318                    )
319                    .map_err(process_sqlite_error)?;
320                    Ok(ProcessHandleGrant {
321                        session_id: session_scope.session_id.clone(),
322                        process_id: process_id.clone(),
323                        descriptor,
324                    })
325                })()))
326            })
327            .await
328            .map_err(process_sqlite_error)?
329    }
330
331    async fn revoke_handle(
332        &self,
333        session_scope: &SessionScope,
334        process_id: &str,
335    ) -> Result<(), lash_core::PluginError> {
336        let session_scope_id = session_scope.id().as_str().to_string();
337        let process_id = process_id.to_string();
338        self.conn
339            .call(move |conn| {
340                conn.execute(
341                    "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
342                    params![session_scope_id, process_id],
343                )
344            })
345            .await
346            .map_err(process_sqlite_error)?;
347        Ok(())
348    }
349
350    async fn transfer_handle_grants(
351        &self,
352        from_scope: &SessionScope,
353        to_scope: &SessionScope,
354        process_ids: &[String],
355    ) -> Result<(), lash_core::PluginError> {
356        let from_scope = from_scope.clone();
357        let to_scope = to_scope.clone();
358        let process_ids = process_ids.to_vec();
359        self.conn
360            .write_flow(move |tx| {
361                Ok(tx_outcome((|| {
362                    let from_scope_id = from_scope.id();
363                    let to_scope_id = to_scope.id();
364                    for process_id in &process_ids {
365                        let descriptor_json: Option<String> = tx
366                            .query_row(
367                                "SELECT descriptor_json
368                                 FROM process_handle_grants
369                                 WHERE scope_id = ?1 AND process_id = ?2",
370                                params![from_scope_id.as_str(), process_id.as_str()],
371                                |row| row.get(0),
372                            )
373                            .optional()
374                            .map_err(process_sqlite_error)?;
375                        let Some(descriptor_json) = descriptor_json else {
376                            return Err(lash_core::PluginError::Session(format!(
377                                "process handle `{process_id}` is not granted to session `{}`",
378                                from_scope.session_id
379                            )));
380                        };
381                        tx.execute(
382                            "DELETE FROM process_handle_grants
383                             WHERE scope_id = ?1 AND process_id = ?2",
384                            params![from_scope_id.as_str(), process_id.as_str()],
385                        )
386                        .map_err(process_sqlite_error)?;
387                        tx.execute(
388                            "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
389                             VALUES (?1, ?2, ?3, ?4)
390                             ON CONFLICT(scope_id, process_id) DO UPDATE SET
391                                session_id = excluded.session_id,
392                                descriptor_json = excluded.descriptor_json",
393                            params![
394                                to_scope.session_id.as_str(),
395                                to_scope_id.as_str(),
396                                process_id.as_str(),
397                                descriptor_json
398                            ],
399                        )
400                        .map_err(process_sqlite_error)?;
401                    }
402                    Ok(())
403                })()))
404            })
405            .await
406            .map_err(process_sqlite_error)?
407    }
408
409    async fn list_handle_grants(
410        &self,
411        session_scope: &SessionScope,
412    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
413        let session_scope = session_scope.clone();
414        self.conn
415            .call(move |conn| {
416                Ok(Self::list_grants_for_scope_conn(
417                    conn,
418                    &session_scope,
419                    false,
420                ))
421            })
422            .await
423            .map_err(process_sqlite_error)?
424    }
425
426    async fn list_live_handle_grants(
427        &self,
428        session_scope: &SessionScope,
429    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
430        let session_scope = session_scope.clone();
431        self.conn
432            .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &session_scope, true)))
433            .await
434            .map_err(process_sqlite_error)?
435    }
436
437    async fn has_handle_grant(
438        &self,
439        session_scope: &SessionScope,
440        process_id: &str,
441    ) -> Result<bool, lash_core::PluginError> {
442        let session_scope_id = session_scope.id().as_str().to_string();
443        let process_id = process_id.to_string();
444        self.conn
445            .call(move |conn| {
446                let exists = conn
447                    .query_row(
448                        "SELECT 1
449                         FROM process_handle_grants g
450                         JOIN processes p ON p.process_id = g.process_id
451                         WHERE g.scope_id = ?1 AND g.process_id = ?2
452                         LIMIT 1",
453                        params![session_scope_id, process_id],
454                        |_| Ok(()),
455                    )
456                    .optional()?
457                    .is_some();
458                Ok(exists)
459            })
460            .await
461            .map_err(process_sqlite_error)
462    }
463
464    async fn handle_grants_for_process(
465        &self,
466        process_id: &str,
467    ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
468        let process_id = process_id.to_string();
469        self.conn
470            .call(move |conn| {
471                Ok((|| {
472                    if Self::load_process_conn(conn, &process_id)?.is_none() {
473                        return Err(lash_core::PluginError::Session(format!(
474                            "unknown process `{process_id}`"
475                        )));
476                    }
477                    let mut stmt = conn
478                        .prepare(
479                            "SELECT session_id, descriptor_json
480                             FROM process_handle_grants
481                             WHERE process_id = ?1
482                             ORDER BY session_id ASC, scope_id ASC",
483                        )
484                        .map_err(process_sqlite_error)?;
485                    let rows = stmt
486                        .query_map(params![process_id], |row| {
487                            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
488                        })
489                        .map_err(process_sqlite_error)?;
490                    let mut grants = Vec::new();
491                    for row in rows {
492                        let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
493                        let descriptor: ProcessHandleDescriptor =
494                            serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
495                        grants.push(ProcessHandleGrant {
496                            session_id,
497                            process_id: process_id.clone(),
498                            descriptor,
499                        });
500                    }
501                    Ok(grants)
502                })())
503            })
504            .await
505            .map_err(process_sqlite_error)?
506    }
507
508    async fn delete_session_process_state(
509        &self,
510        session_id: &str,
511    ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
512        let session_id_owned = session_id.to_string();
513        let (
514            revoked_handle_count,
515            deleted_wake_count,
516            mut orphaned_process_ids,
517            mut preserved_process_ids,
518        ) = self
519            .conn
520            .write_flow(move |tx| {
521                Ok(tx_outcome((|| {
522                    let session_id = session_id_owned;
523                    let removed = {
524                        let mut stmt = tx
525                            .prepare(
526                                "SELECT g.process_id, p.record_json
527                                 FROM process_handle_grants g
528                                 JOIN processes p ON p.process_id = g.process_id
529                                 WHERE g.session_id = ?1
530                                 ORDER BY g.process_id ASC",
531                            )
532                            .map_err(process_sqlite_error)?;
533                        let rows = stmt
534                            .query_map(params![session_id], |row| {
535                                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
536                            })
537                            .map_err(process_sqlite_error)?;
538                        let mut removed = Vec::new();
539                        for row in rows {
540                            let (process_id, record_json) = row.map_err(process_sqlite_error)?;
541                            let record: ProcessRecord =
542                                serde_json::from_str(&record_json).map_err(process_decode_error)?;
543                            removed.push((process_id, record));
544                        }
545                        removed
546                    };
547
548                    // Wake acknowledgements are process-scoped consumed-event markers.
549                    // Session deletion removes materialized session-addressed deliveries
550                    // through the session store; clearing these rows would re-expose
551                    // already-consumed wakes to surviving grants or future host readers.
552                    let deleted_wake_count = 0;
553                    let revoked_handle_count = tx
554                        .execute(
555                            "DELETE FROM process_handle_grants WHERE session_id = ?1",
556                            params![session_id],
557                        )
558                        .map_err(process_sqlite_error)?;
559                    let mut orphaned_process_ids = Vec::new();
560                    let mut preserved_process_ids = Vec::new();
561                    for (process_id, record) in removed {
562                        if record.is_terminal() {
563                            continue;
564                        }
565                        let remaining_grants: i64 = tx
566                            .query_row(
567                                "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
568                                params![process_id],
569                                |row| row.get(0),
570                            )
571                            .map_err(process_sqlite_error)?;
572                        if remaining_grants == 0 {
573                            orphaned_process_ids.push(process_id);
574                        } else {
575                            preserved_process_ids.push(process_id);
576                        }
577                    }
578                    Ok((
579                        revoked_handle_count,
580                        deleted_wake_count,
581                        orphaned_process_ids,
582                        preserved_process_ids,
583                    ))
584                })()))
585            })
586            .await
587            .map_err(process_sqlite_error)??;
588        orphaned_process_ids.sort();
589        orphaned_process_ids.dedup();
590        preserved_process_ids.sort();
591        preserved_process_ids.dedup();
592        Ok(lash_core::ProcessSessionDeleteReport {
593            session_id: session_id.to_string(),
594            revoked_handle_count,
595            deleted_wake_count,
596            orphaned_process_ids,
597            preserved_process_ids,
598        })
599    }
600
601    async fn append_event(
602        &self,
603        process_id: &str,
604        request: ProcessEventAppendRequest,
605    ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
606        let process_id = process_id.to_string();
607        let (result, appended) = self
608            .conn
609            .write_flow(move |tx| {
610                Ok(tx_outcome((|| {
611                    let mut record =
612                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
613                            lash_core::PluginError::Session(format!(
614                                "unknown process `{process_id}`"
615                            ))
616                        })?;
617                    let replay_lookup = if let Some(replay_key) =
618                        request.replay.as_ref().map(|replay| replay.key.as_str())
619                    {
620                        Self::load_event_by_key_conn(tx, &process_id, replay_key)?
621                    } else {
622                        None
623                    };
624                    let sequence = tx
625                        .query_row(
626                            "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
627                            params![process_id],
628                            |row| row.get::<_, i64>(0),
629                        )
630                        .map_err(process_sqlite_error)? as u64;
631                    let occurred_at_ms = current_epoch_ms();
632                    let prepared = prepare_process_event_append(
633                        &record,
634                        request,
635                        sequence,
636                        replay_lookup,
637                        occurred_at_ms,
638                    )?;
639                    if prepared.replayed {
640                        return Ok((
641                            ProcessEventAppendResult {
642                                event: prepared.event,
643                                wake_delivery: prepared.wake_delivery,
644                            },
645                            false,
646                        ));
647                    }
648                    let event = prepared.event;
649                    tx.execute(
650                        "INSERT INTO process_events (
651                            process_id, sequence, event_type, payload_hash, idempotency_key,
652                            occurred_at_ms, event_json
653                         )
654                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
655                        params![
656                            process_id,
657                            sequence as i64,
658                            event.event_type.as_str(),
659                            prepared.payload_hash.as_str(),
660                            event.invocation.replay_key(),
661                            prepared.occurred_at_ms as i64,
662                            process_encode_json(&event)?,
663                        ],
664                    )
665                    .map_err(process_sqlite_error)?;
666                    if let Some(status) = prepared.status_update.clone() {
667                        record.status = status;
668                        if record.status.is_terminal() {
669                            record.wait = None;
670                        }
671                    }
672                    record.updated_at_ms = prepared.occurred_at_ms;
673                    Self::save_process_conn(tx, &record)?;
674                    Ok((
675                        ProcessEventAppendResult {
676                            event,
677                            wake_delivery: prepared.wake_delivery,
678                        },
679                        true,
680                    ))
681                })()))
682            })
683            .await
684            .map_err(process_sqlite_error)??;
685        if appended {
686            self.notify.notify_waiters();
687        }
688        Ok(result)
689    }
690
691    async fn events_after(
692        &self,
693        process_id: &str,
694        after_sequence: u64,
695    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
696        let process_id = process_id.to_string();
697        self.conn
698            .call(move |conn| {
699                Ok((|| {
700                    if Self::load_process_conn(conn, &process_id)?.is_none() {
701                        return Err(lash_core::PluginError::Session(format!(
702                            "unknown process `{process_id}`"
703                        )));
704                    }
705                    let mut stmt = conn
706                        .prepare(
707                            "SELECT event_json FROM process_events
708                             WHERE process_id = ?1 AND sequence > ?2
709                             ORDER BY sequence ASC",
710                        )
711                        .map_err(process_sqlite_error)?;
712                    let rows = stmt
713                        .query_map(params![process_id, after_sequence as i64], |row| {
714                            row.get::<_, String>(0)
715                        })
716                        .map_err(process_sqlite_error)?;
717                    let mut events = Vec::new();
718                    for row in rows {
719                        events.push(
720                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
721                                .map_err(process_decode_error)?,
722                        );
723                    }
724                    Ok(events)
725                })())
726            })
727            .await
728            .map_err(process_sqlite_error)?
729    }
730
731    async fn count_events_through(
732        &self,
733        process_id: &str,
734        event_type: &str,
735        up_to_sequence: u64,
736    ) -> Result<u64, lash_core::PluginError> {
737        let process_id = process_id.to_string();
738        let event_type = event_type.to_string();
739        self.conn
740            .call(move |conn| {
741                Ok((|| {
742                    if Self::load_process_conn(conn, &process_id)?.is_none() {
743                        return Err(lash_core::PluginError::Session(format!(
744                            "unknown process `{process_id}`"
745                        )));
746                    }
747                    conn.query_row(
748                        "SELECT COUNT(*) FROM process_events
749                         WHERE process_id = ?1 AND event_type = ?2 AND sequence <= ?3",
750                        params![process_id, event_type, up_to_sequence as i64],
751                        |row| row.get::<_, i64>(0),
752                    )
753                    .map(|count| count as u64)
754                    .map_err(process_sqlite_error)
755                })())
756            })
757            .await
758            .map_err(process_sqlite_error)?
759    }
760
761    async fn recent_events(
762        &self,
763        process_id: &str,
764        limit: usize,
765    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
766        let process_id = process_id.to_string();
767        self.conn
768            .call(move |conn| {
769                Ok((|| {
770                    if Self::load_process_conn(conn, &process_id)?.is_none() {
771                        return Err(lash_core::PluginError::Session(format!(
772                            "unknown process `{process_id}`"
773                        )));
774                    }
775                    let mut stmt = conn
776                        .prepare(
777                            "SELECT event_json FROM process_events
778                             WHERE process_id = ?1
779                             ORDER BY sequence DESC
780                             LIMIT ?2",
781                        )
782                        .map_err(process_sqlite_error)?;
783                    let rows = stmt
784                        .query_map(params![process_id, limit as i64], |row| {
785                            row.get::<_, String>(0)
786                        })
787                        .map_err(process_sqlite_error)?;
788                    let mut events: Vec<ProcessEvent> = Vec::new();
789                    for row in rows {
790                        events.push(
791                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
792                                .map_err(process_decode_error)?,
793                        );
794                    }
795                    events.reverse();
796                    Ok(events)
797                })())
798            })
799            .await
800            .map_err(process_sqlite_error)?
801    }
802
803    async fn wake_events_after(
804        &self,
805        process_id: &str,
806        after_sequence: u64,
807    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
808        let acked: std::collections::HashSet<u64> = {
809            let process_id = process_id.to_string();
810            self.conn
811                .call(move |conn| {
812                    Ok(
813                        (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
814                            let mut stmt = conn
815                                .prepare(
816                                    "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
817                                )
818                                .map_err(process_sqlite_error)?;
819                            let rows = stmt
820                                .query_map(params![process_id], |row| row.get::<_, i64>(0))
821                                .map_err(process_sqlite_error)?;
822                            let mut set = std::collections::HashSet::new();
823                            for row in rows {
824                                set.insert(row.map_err(process_sqlite_error)? as u64);
825                            }
826                            Ok(set)
827                        })(),
828                    )
829                })
830                .await
831                .map_err(process_sqlite_error)??
832        };
833        Ok(self
834            .events_after(process_id, after_sequence)
835            .await?
836            .into_iter()
837            .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
838            .collect())
839    }
840
841    async fn wait_event_after(
842        &self,
843        process_id: &str,
844        event_type: &str,
845        after_sequence: u64,
846    ) -> Result<ProcessEvent, lash_core::PluginError> {
847        loop {
848            if let Some(event) = self
849                .events_after(process_id, after_sequence)
850                .await?
851                .into_iter()
852                .find(|event| event.event_type == event_type)
853            {
854                return Ok(event);
855            }
856            tokio::select! {
857                _ = self.notify.notified() => {}
858                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
859            }
860        }
861    }
862
863    async fn await_process(
864        &self,
865        process_id: &str,
866    ) -> Result<ProcessAwaitOutput, lash_core::PluginError> {
867        loop {
868            let record = self.get_process(process_id).await.ok_or_else(|| {
869                lash_core::PluginError::Session(format!("unknown process `{process_id}`"))
870            })?;
871            if let Some(await_output) = record.status.await_output() {
872                return Ok(await_output.clone());
873            }
874            tokio::select! {
875                _ = self.notify.notified() => {}
876                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
877            }
878        }
879    }
880
881    async fn complete_process(
882        &self,
883        process_id: &str,
884        await_output: ProcessAwaitOutput,
885    ) -> Result<ProcessRecord, lash_core::PluginError> {
886        let event_type = match await_output.terminal_state() {
887            lash_core::ProcessTerminalState::Completed => "process.completed",
888            lash_core::ProcessTerminalState::Failed => "process.failed",
889            lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
890        };
891        self.append_event(
892            process_id,
893            ProcessEventAppendRequest::new(
894                event_type,
895                serde_json::json!({ "await_output": await_output }),
896            )
897            .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
898        )
899        .await?;
900        self.get_process(process_id).await.ok_or_else(|| {
901            lash_core::PluginError::Session(format!(
902                "unknown process `{process_id}` after terminal event"
903            ))
904        })
905    }
906
907    async fn set_process_wait(
908        &self,
909        process_id: &str,
910        wait: lash_core::WaitState,
911    ) -> Result<ProcessRecord, lash_core::PluginError> {
912        let process_id = process_id.to_string();
913        self.conn
914            .write_flow(move |tx| {
915                Ok(tx_outcome((|| {
916                    let mut record =
917                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
918                            lash_core::PluginError::Session(format!(
919                                "unknown process `{process_id}`"
920                            ))
921                        })?;
922                    if record.is_terminal() {
923                        return Err(lash_core::PluginError::Session(format!(
924                            "terminal process `{process_id}` cannot enter a wait state"
925                        )));
926                    }
927                    record.wait = Some(wait);
928                    record.updated_at_ms = current_epoch_ms();
929                    Self::save_process_conn(tx, &record)?;
930                    Ok(record)
931                })()))
932            })
933            .await
934            .map_err(process_sqlite_error)?
935    }
936
937    async fn clear_process_wait(
938        &self,
939        process_id: &str,
940    ) -> Result<ProcessRecord, lash_core::PluginError> {
941        let process_id = process_id.to_string();
942        self.conn
943            .write_flow(move |tx| {
944                Ok(tx_outcome((|| {
945                    let mut record =
946                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
947                            lash_core::PluginError::Session(format!(
948                                "unknown process `{process_id}`"
949                            ))
950                        })?;
951                    record.wait = None;
952                    record.updated_at_ms = current_epoch_ms();
953                    Self::save_process_conn(tx, &record)?;
954                    Ok(record)
955                })()))
956            })
957            .await
958            .map_err(process_sqlite_error)?
959    }
960
961    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
962        let process_id = process_id.to_string();
963        self.conn
964            .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
965            .await
966            .ok()
967            .flatten()
968    }
969
970    async fn list_processes(
971        &self,
972        filter: &lash_core::ProcessListFilter,
973    ) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
974        let filter = filter.clone();
975        self.conn
976            .call(move |conn| {
977                Ok((|| {
978                    let mut stmt = conn
979                        .prepare(
980                            "SELECT record_json FROM processes
981                             ORDER BY process_id ASC",
982                        )
983                        .map_err(process_sqlite_error)?;
984                    let rows = stmt
985                        .query_map([], |row| row.get::<_, String>(0))
986                        .map_err(process_sqlite_error)?;
987                    let mut records = Vec::new();
988                    for row in rows {
989                        let record: ProcessRecord =
990                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
991                                .map_err(process_decode_error)?;
992                        if filter.matches_record(&record) {
993                            records.push(record);
994                        }
995                    }
996                    Ok(records)
997                })())
998            })
999            .await
1000            .map_err(process_sqlite_error)?
1001    }
1002
1003    async fn ack_wake(
1004        &self,
1005        process_id: &str,
1006        sequence: u64,
1007    ) -> Result<(), lash_core::PluginError> {
1008        let process_id = process_id.to_string();
1009        self.conn
1010            .call(move |conn| {
1011                Ok((|| {
1012                    if Self::load_process_conn(conn, &process_id)?.is_none() {
1013                        return Err(lash_core::PluginError::Session(format!(
1014                            "unknown process `{process_id}`"
1015                        )));
1016                    }
1017                    conn.execute(
1018                        "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
1019                        params![process_id, sequence as i64],
1020                    )
1021                    .map_err(process_sqlite_error)?;
1022                    Ok(())
1023                })())
1024            })
1025            .await
1026            .map_err(process_sqlite_error)?
1027    }
1028
1029    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1030        self.conn
1031            .call(move |conn| {
1032                Ok((|| {
1033                    let mut stmt = conn
1034                        .prepare(
1035                            "SELECT record_json FROM processes
1036                             WHERE status = 'running'
1037                             ORDER BY process_id ASC",
1038                        )
1039                        .map_err(process_sqlite_error)?;
1040                    let rows = stmt
1041                        .query_map([], |row| row.get::<_, String>(0))
1042                        .map_err(process_sqlite_error)?;
1043                    let mut records = Vec::new();
1044                    for row in rows {
1045                        let record: ProcessRecord =
1046                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
1047                                .map_err(process_decode_error)?;
1048                        records.push(record);
1049                    }
1050                    Ok(records)
1051                })())
1052            })
1053            .await
1054            .map_err(process_sqlite_error)?
1055    }
1056
1057    async fn claim_process_lease(
1058        &self,
1059        process_id: &str,
1060        owner_id: &str,
1061        lease_ttl_ms: u64,
1062    ) -> Result<ProcessLease, lash_core::PluginError> {
1063        let process_id = process_id.to_string();
1064        let owner_id = owner_id.to_string();
1065        self.conn
1066            .write_flow(move |tx| {
1067                Ok(tx_outcome((|| {
1068                    if Self::load_process_conn(tx, &process_id)?.is_none() {
1069                        return Err(lash_core::PluginError::Session(format!(
1070                            "unknown process `{process_id}`"
1071                        )));
1072                    }
1073                    let now = current_epoch_ms();
1074                    let current = Self::load_process_lease_conn(tx, &process_id)?;
1075                    if let Some(current) = current.as_ref()
1076                        && current.expires_at_epoch_ms > now
1077                        && current.owner_id != owner_id
1078                    {
1079                        return Err(process_lease_conflict(&process_id, current));
1080                    }
1081                    // Read the raw fencing token directly: a completed/abandoned
1082                    // lease nulls the owner/token columns but retains the
1083                    // monotonically-increasing `lease_fencing_token`, so a
1084                    // re-claim never reuses a stale writer's token.
1085                    let fencing_token: u64 = tx
1086                        .query_row(
1087                            "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1088                            params![process_id],
1089                            |row| row.get::<_, i64>(0),
1090                        )
1091                        .optional()
1092                        .map_err(process_sqlite_error)?
1093                        .unwrap_or(0) as u64
1094                        + 1;
1095                    let lease = ProcessLease {
1096                        schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1097                        process_id: process_id.clone(),
1098                        owner_id: owner_id.clone(),
1099                        lease_token: format!(
1100                            "{:x}",
1101                            Sha256::digest(
1102                                format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes()
1103                            )
1104                        ),
1105                        fencing_token,
1106                        claimed_at_epoch_ms: now,
1107                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1108                    };
1109                    tx.execute(
1110                        "INSERT INTO process_leases (
1111                            process_id, lease_owner_id, lease_token, lease_fencing_token,
1112                            lease_claimed_at_ms, lease_expires_at_ms
1113                         )
1114                         VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1115                         ON CONFLICT(process_id) DO UPDATE SET
1116                            lease_owner_id = excluded.lease_owner_id,
1117                            lease_token = excluded.lease_token,
1118                            lease_fencing_token = excluded.lease_fencing_token,
1119                            lease_claimed_at_ms = excluded.lease_claimed_at_ms,
1120                            lease_expires_at_ms = excluded.lease_expires_at_ms",
1121                        params![
1122                            lease.process_id.as_str(),
1123                            lease.owner_id.as_str(),
1124                            lease.lease_token.as_str(),
1125                            lease.fencing_token as i64,
1126                            lease.claimed_at_epoch_ms as i64,
1127                            lease.expires_at_epoch_ms as i64,
1128                        ],
1129                    )
1130                    .map_err(process_sqlite_error)?;
1131                    Ok(lease)
1132                })()))
1133            })
1134            .await
1135            .map_err(process_sqlite_error)?
1136    }
1137
1138    async fn renew_process_lease(
1139        &self,
1140        lease: &ProcessLease,
1141        lease_ttl_ms: u64,
1142    ) -> Result<ProcessLease, lash_core::PluginError> {
1143        let lease = lease.clone();
1144        self.conn
1145            .write_flow(move |tx| {
1146                Ok(tx_outcome((|| {
1147                    let now = current_epoch_ms();
1148                    let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
1149                    if !guard_lease(current.as_ref(), &lease.lease_token, now) {
1150                        return Err(process_lease_expired(&lease.process_id));
1151                    }
1152                    let renewed = ProcessLease {
1153                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1154                        ..lease.clone()
1155                    };
1156                    tx.execute(
1157                        "UPDATE process_leases
1158                         SET lease_expires_at_ms = ?2
1159                         WHERE process_id = ?1 AND lease_token = ?3",
1160                        params![
1161                            renewed.process_id.as_str(),
1162                            renewed.expires_at_epoch_ms as i64,
1163                            renewed.lease_token.as_str(),
1164                        ],
1165                    )
1166                    .map_err(process_sqlite_error)?;
1167                    Ok(renewed)
1168                })()))
1169            })
1170            .await
1171            .map_err(process_sqlite_error)?
1172    }
1173
1174    async fn complete_process_lease(
1175        &self,
1176        completion: &ProcessLeaseCompletion,
1177    ) -> Result<(), lash_core::PluginError> {
1178        let process_id = completion.process_id.clone();
1179        let lease_token = completion.lease_token.clone();
1180        self.conn
1181            .call(move |conn| {
1182                conn.execute(
1183                    "UPDATE process_leases
1184                     SET lease_owner_id = NULL,
1185                         lease_token = NULL,
1186                         lease_claimed_at_ms = 0,
1187                         lease_expires_at_ms = 0
1188                     WHERE process_id = ?1 AND lease_token = ?2",
1189                    params![process_id, lease_token],
1190                )
1191            })
1192            .await
1193            .map_err(process_sqlite_error)?;
1194        Ok(())
1195    }
1196}
1197
1198/// Loud, stable error for a fenced process-lease claim on the `PluginError`
1199/// channel the [`ProcessRegistry`] trait returns.
1200fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> lash_core::PluginError {
1201    lash_core::PluginError::Session(format!(
1202        "process `{process_id}` is already leased by `{}` until {}",
1203        current.owner_id, current.expires_at_epoch_ms
1204    ))
1205}
1206
1207/// Loud, stable error for a superseded or expired process lease.
1208fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1209    lash_core::PluginError::Session(format!(
1210        "process lease for `{process_id}` is missing or expired"
1211    ))
1212}