Skip to main content

lash_sqlite_store/
process_registry.rs

1//! SQLite-backed [`ProcessRegistry`] (`SqliteProcessRegistry`).
2//!
3//! First-party SQLite implementation of the public async process-registry
4//! surface. Every DB body is a *synchronous* rusqlite closure handed
5//! to [`SqliteConnection::call`] (reads) or [`SqliteConnection::write_flow`]
6//! (read-then-write).
7//!
8//! ## Why `write_flow`, not `write`
9//!
10//! The registry's transactional methods produce a [`lash_core::PluginError`],
11//! not a `rusqlite::Error`. `SqliteConnection::write` rolls back only when the
12//! closure returns `Err(rusqlite::Error)`, so a logical `PluginError` (e.g. a
13//! registration-hash conflict) would otherwise *commit* the partial work. Each
14//! such method therefore runs its synchronous body returning
15//! `Result<T, PluginError>` and maps it to a [`TxOutcome`]: `Ok` ⇒
16//! `Commit(Ok(value))`, `Err` ⇒ `Rollback(Err(error))`. That preserves the
17//! prior behaviour of rolling back on every error while still carrying the
18//! `PluginError` back to the caller. The outer `rusqlite::Error` channel only
19//! carries genuine SQLite/connection failures, mapped via `process_sqlite_error`.
20//!
21//! The `*_conn` helpers are synchronous and take a `&rusqlite::Connection` so
22//! they compose inside either closure — including from within a `&Transaction`,
23//! which derefs to `&Connection`.
24
25use super::*;
26
27fn process_status_label(record: &ProcessRecord) -> &'static str {
28    record.status.label()
29}
30
31impl SqliteProcessRegistry {
32    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
33        let conn = SqliteConnection::open(path).await?;
34        ensure_process_schema(&conn).await?;
35        apply_pragmas(&conn, StoreBacking::File).await?;
36        Ok(Self { conn })
37    }
38
39    pub async fn memory() -> tokio_rusqlite::Result<Self> {
40        let conn = SqliteConnection::open_in_memory().await?;
41        ensure_process_schema(&conn).await?;
42        apply_pragmas(&conn, StoreBacking::Memory).await?;
43        Ok(Self { conn })
44    }
45
46    fn load_process_conn(
47        conn: &Connection,
48        process_id: &str,
49    ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
50        let json: Option<String> = conn
51            .query_row(
52                "SELECT record_json FROM processes WHERE process_id = ?1",
53                params![process_id],
54                |row| row.get(0),
55            )
56            .optional()
57            .map_err(process_sqlite_error)?;
58        json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
59            .transpose()
60    }
61
62    fn save_process_conn(
63        conn: &Connection,
64        record: &ProcessRecord,
65    ) -> Result<(), lash_core::PluginError> {
66        conn.execute(
67            "UPDATE processes
68             SET updated_at_ms = ?2, status = ?3, record_json = ?4
69             WHERE process_id = ?1",
70            params![
71                record.id.as_str(),
72                record.updated_at_ms as i64,
73                process_status_label(record),
74                process_encode_json(record)?
75            ],
76        )
77        .map_err(process_sqlite_error)?;
78        Ok(())
79    }
80
81    fn load_event_by_key_conn(
82        conn: &Connection,
83        process_id: &str,
84        replay_key: &str,
85    ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
86        let row: Option<(String, String)> = conn
87            .query_row(
88                "SELECT payload_hash, event_json
89                 FROM process_events
90                 WHERE process_id = ?1 AND idempotency_key = ?2",
91                params![process_id, replay_key],
92                |row| Ok((row.get(0)?, row.get(1)?)),
93            )
94            .optional()
95            .map_err(process_sqlite_error)?;
96        row.map(|(hash, json)| {
97            serde_json::from_str(&json)
98                .map(|event| (hash, event))
99                .map_err(process_decode_error)
100        })
101        .transpose()
102    }
103
104    fn load_process_lease_conn(
105        conn: &Connection,
106        process_id: &str,
107    ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
108        conn.query_row(
109            "SELECT lease_owner_id, lease_token, lease_fencing_token,
110                    lease_claimed_at_ms, lease_expires_at_ms,
111                    lease_owner_incarnation_id, lease_owner_liveness_json
112             FROM process_leases
113             WHERE process_id = ?1",
114            params![process_id],
115            |row| {
116                let owner_id: Option<String> = row.get(0)?;
117                let lease_token: Option<String> = row.get(1)?;
118                let incarnation_id: Option<String> = row.get(5)?;
119                let liveness_json: Option<String> = row.get(6)?;
120                let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
121                    return Ok(None);
122                };
123                Ok(Some(ProcessLease {
124                    schema_version: PROCESS_LEASE_SCHEMA_VERSION,
125                    process_id: process_id.to_string(),
126                    owner: process_lease_owner_from_columns(
127                        owner_id,
128                        incarnation_id,
129                        liveness_json,
130                    ),
131                    lease_token,
132                    fencing_token: row.get::<_, i64>(2)? as u64,
133                    claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
134                    expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
135                }))
136            },
137        )
138        .optional()
139        .map(|lease| lease.flatten())
140        .map_err(process_sqlite_error)
141    }
142
143    /// Insert-or-replace the persisted lease row for `process_id` with a fresh
144    /// lease owned by `owner` at `fencing_token`.
145    fn acquire_process_lease_conn(
146        conn: &Connection,
147        process_id: &str,
148        owner: &LeaseOwnerIdentity,
149        fencing_token: u64,
150        now: u64,
151        lease_ttl_ms: u64,
152    ) -> Result<ProcessLease, lash_core::PluginError> {
153        let lease = ProcessLease {
154            schema_version: PROCESS_LEASE_SCHEMA_VERSION,
155            process_id: process_id.to_string(),
156            owner: owner.clone(),
157            lease_token: format!(
158                "{:x}",
159                Sha256::digest(
160                    format!(
161                        "{process_id}:{}:{}:{now}:{fencing_token}",
162                        owner.owner_id, owner.incarnation_id
163                    )
164                    .as_bytes()
165                )
166            ),
167            fencing_token,
168            claimed_at_epoch_ms: now,
169            expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
170        };
171        conn.execute(
172            "INSERT INTO process_leases (
173                process_id, lease_owner_id, lease_owner_incarnation_id,
174                lease_owner_liveness_json, lease_token, lease_fencing_token,
175                lease_claimed_at_ms, lease_expires_at_ms
176             )
177             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
178             ON CONFLICT(process_id) DO UPDATE SET
179                lease_owner_id = excluded.lease_owner_id,
180                lease_owner_incarnation_id = excluded.lease_owner_incarnation_id,
181                lease_owner_liveness_json = excluded.lease_owner_liveness_json,
182                lease_token = excluded.lease_token,
183                lease_fencing_token = excluded.lease_fencing_token,
184                lease_claimed_at_ms = excluded.lease_claimed_at_ms,
185                lease_expires_at_ms = excluded.lease_expires_at_ms",
186            params![
187                lease.process_id.as_str(),
188                lease.owner.owner_id.as_str(),
189                lease.owner.incarnation_id.as_str(),
190                encode_process_lease_liveness(&lease.owner.liveness)?,
191                lease.lease_token.as_str(),
192                lease.fencing_token as i64,
193                lease.claimed_at_epoch_ms as i64,
194                lease.expires_at_epoch_ms as i64,
195            ],
196        )
197        .map_err(process_sqlite_error)?;
198        Ok(lease)
199    }
200
201    fn list_grants_for_scope_conn(
202        conn: &Connection,
203        session_scope: &SessionScope,
204        live_only: bool,
205    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
206        let session_scope_id = session_scope.id();
207        let status_clause = if live_only {
208            "AND p.status = 'running'"
209        } else {
210            ""
211        };
212        let mut stmt = conn
213            .prepare(&format!(
214                "SELECT g.process_id, g.descriptor_json, p.record_json
215                 FROM process_handle_grants g
216                 JOIN processes p ON p.process_id = g.process_id
217                 WHERE g.scope_id = ?1 {status_clause}
218                 ORDER BY g.process_id ASC"
219            ))
220            .map_err(process_sqlite_error)?;
221        let rows = stmt
222            .query_map(params![session_scope_id.as_str()], |row| {
223                Ok((
224                    row.get::<_, String>(0)?,
225                    row.get::<_, String>(1)?,
226                    row.get::<_, String>(2)?,
227                ))
228            })
229            .map_err(process_sqlite_error)?;
230        let mut entries = Vec::new();
231        for row in rows {
232            let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
233            let descriptor: ProcessHandleDescriptor =
234                serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
235            let record: ProcessRecord =
236                serde_json::from_str(&record_json).map_err(process_decode_error)?;
237            entries.push((
238                ProcessHandleGrant {
239                    session_id: session_scope.session_id.clone(),
240                    process_id,
241                    descriptor,
242                },
243                record,
244            ));
245        }
246        Ok(entries)
247    }
248}
249
250/// Map a `Result<T, PluginError>` produced by a synchronous transaction body to
251/// a [`TxOutcome`]: commit on success, roll back on logical error. Both arms
252/// carry the inner `Result` back so the caller recovers the value or the
253/// `PluginError` after the transaction resolves.
254fn tx_outcome<T>(
255    result: Result<T, lash_core::PluginError>,
256) -> TxOutcome<Result<T, lash_core::PluginError>> {
257    match result {
258        Ok(value) => TxOutcome::Commit(Ok(value)),
259        Err(err) => TxOutcome::Rollback(Err(err)),
260    }
261}
262
263#[async_trait::async_trait]
264impl ProcessRegistry for SqliteProcessRegistry {
265    fn durability_tier(&self) -> DurabilityTier {
266        DurabilityTier::Durable
267    }
268
269    async fn register_process(
270        &self,
271        registration: ProcessRegistration,
272    ) -> Result<ProcessRecord, lash_core::PluginError> {
273        let (registration, registration_hash) = prepare_process_registration(registration)?;
274        let record = self
275            .conn
276            .write_flow(move |tx| {
277                Ok(tx_outcome((|| {
278                    if let Some(existing) = Self::load_process_conn(tx, &registration.id)? {
279                        if existing.registration_hash == registration_hash {
280                            return Ok(existing);
281                        }
282                        return Err(lash_core::PluginError::Session(format!(
283                            "process `{}` registration hash conflict: existing {}, new {}",
284                            registration.id, existing.registration_hash, registration_hash
285                        )));
286                    }
287                    let now = current_epoch_ms();
288                    let record = ProcessRecord::from_prepared_registration(
289                        registration,
290                        registration_hash,
291                        now,
292                    );
293                    let originator_scope_id = record.originator_scope_id();
294                    tx.execute(
295                        "INSERT INTO processes (
296                            process_id, registration_hash, owner_scope_id,
297                            created_at_ms, updated_at_ms, status, record_json
298                         )
299                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
300                        params![
301                            record.id.as_str(),
302                            record.registration_hash.as_str(),
303                            originator_scope_id.as_str(),
304                            record.created_at_ms as i64,
305                            record.updated_at_ms as i64,
306                            process_status_label(&record),
307                            process_encode_json(&record)?,
308                        ],
309                    )
310                    .map_err(process_sqlite_error)?;
311                    Ok(record)
312                })()))
313            })
314            .await
315            .map_err(process_sqlite_error)??;
316        Ok(record)
317    }
318
319    async fn set_external_ref(
320        &self,
321        process_id: &str,
322        external_ref: ProcessExternalRef,
323    ) -> Result<ProcessRecord, lash_core::PluginError> {
324        let process_id = process_id.to_string();
325        let (record, _changed) = self
326            .conn
327            .write_flow(move |tx| {
328                Ok(tx_outcome((|| {
329                    let mut record =
330                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
331                            lash_core::PluginError::Session(format!(
332                                "unknown process `{process_id}`"
333                            ))
334                        })?;
335                    if let Some(existing) = &record.external_ref {
336                        if existing == &external_ref {
337                            return Ok((record, false));
338                        }
339                        return Err(process_external_ref_conflict(
340                            &process_id,
341                            existing,
342                            &external_ref,
343                        ));
344                    }
345                    record.external_ref = Some(external_ref);
346                    record.updated_at_ms = current_epoch_ms();
347                    Self::save_process_conn(tx, &record)?;
348                    Ok((record, true))
349                })()))
350            })
351            .await
352            .map_err(process_sqlite_error)??;
353        Ok(record)
354    }
355
356    async fn grant_handle(
357        &self,
358        session_scope: &SessionScope,
359        process_id: &str,
360        descriptor: ProcessHandleDescriptor,
361    ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
362        let session_scope = session_scope.clone();
363        let process_id = process_id.to_string();
364        self.conn
365            .write_flow(move |tx| {
366                Ok(tx_outcome((|| {
367                    let session_scope_id = session_scope.id();
368                    if Self::load_process_conn(tx, &process_id)?.is_none() {
369                        return Err(lash_core::PluginError::Session(format!(
370                            "unknown process `{process_id}`"
371                        )));
372                    }
373                    tx.execute(
374                        "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
375                         VALUES (?1, ?2, ?3, ?4)
376                         ON CONFLICT(scope_id, process_id) DO UPDATE SET
377                            session_id = excluded.session_id,
378                            descriptor_json = excluded.descriptor_json",
379                        params![
380                            session_scope.session_id.as_str(),
381                            session_scope_id.as_str(),
382                            process_id.as_str(),
383                            process_encode_json(&descriptor)?
384                        ],
385                    )
386                    .map_err(process_sqlite_error)?;
387                    Ok(ProcessHandleGrant {
388                        session_id: session_scope.session_id.clone(),
389                        process_id: process_id.clone(),
390                        descriptor,
391                    })
392                })()))
393            })
394            .await
395            .map_err(process_sqlite_error)?
396    }
397
398    async fn revoke_handle(
399        &self,
400        session_scope: &SessionScope,
401        process_id: &str,
402    ) -> Result<(), lash_core::PluginError> {
403        let session_scope_id = session_scope.id().as_str().to_string();
404        let process_id = process_id.to_string();
405        self.conn
406            .call(move |conn| {
407                conn.execute(
408                    "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
409                    params![session_scope_id, process_id],
410                )
411            })
412            .await
413            .map_err(process_sqlite_error)?;
414        Ok(())
415    }
416
417    async fn transfer_handle_grants(
418        &self,
419        from_scope: &SessionScope,
420        to_scope: &SessionScope,
421        process_ids: &[String],
422    ) -> Result<(), lash_core::PluginError> {
423        let from_scope = from_scope.clone();
424        let to_scope = to_scope.clone();
425        let process_ids = process_ids.to_vec();
426        self.conn
427            .write_flow(move |tx| {
428                Ok(tx_outcome((|| {
429                    let from_scope_id = from_scope.id();
430                    let to_scope_id = to_scope.id();
431                    for process_id in &process_ids {
432                        let descriptor_json: Option<String> = tx
433                            .query_row(
434                                "SELECT descriptor_json
435                                 FROM process_handle_grants
436                                 WHERE scope_id = ?1 AND process_id = ?2",
437                                params![from_scope_id.as_str(), process_id.as_str()],
438                                |row| row.get(0),
439                            )
440                            .optional()
441                            .map_err(process_sqlite_error)?;
442                        let Some(descriptor_json) = descriptor_json else {
443                            return Err(lash_core::PluginError::Session(format!(
444                                "process handle `{process_id}` is not granted to session `{}`",
445                                from_scope.session_id
446                            )));
447                        };
448                        tx.execute(
449                            "DELETE FROM process_handle_grants
450                             WHERE scope_id = ?1 AND process_id = ?2",
451                            params![from_scope_id.as_str(), process_id.as_str()],
452                        )
453                        .map_err(process_sqlite_error)?;
454                        tx.execute(
455                            "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
456                             VALUES (?1, ?2, ?3, ?4)
457                             ON CONFLICT(scope_id, process_id) DO UPDATE SET
458                                session_id = excluded.session_id,
459                                descriptor_json = excluded.descriptor_json",
460                            params![
461                                to_scope.session_id.as_str(),
462                                to_scope_id.as_str(),
463                                process_id.as_str(),
464                                descriptor_json
465                            ],
466                        )
467                        .map_err(process_sqlite_error)?;
468                    }
469                    Ok(())
470                })()))
471            })
472            .await
473            .map_err(process_sqlite_error)?
474    }
475
476    async fn list_handle_grants(
477        &self,
478        session_scope: &SessionScope,
479    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
480        let session_scope = session_scope.clone();
481        self.conn
482            .call(move |conn| {
483                Ok(Self::list_grants_for_scope_conn(
484                    conn,
485                    &session_scope,
486                    false,
487                ))
488            })
489            .await
490            .map_err(process_sqlite_error)?
491    }
492
493    async fn list_live_handle_grants(
494        &self,
495        session_scope: &SessionScope,
496    ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
497        let session_scope = session_scope.clone();
498        self.conn
499            .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &session_scope, true)))
500            .await
501            .map_err(process_sqlite_error)?
502    }
503
504    async fn has_handle_grant(
505        &self,
506        session_scope: &SessionScope,
507        process_id: &str,
508    ) -> Result<bool, lash_core::PluginError> {
509        let session_scope_id = session_scope.id().as_str().to_string();
510        let process_id = process_id.to_string();
511        self.conn
512            .call(move |conn| {
513                let exists = conn
514                    .query_row(
515                        "SELECT 1
516                         FROM process_handle_grants g
517                         JOIN processes p ON p.process_id = g.process_id
518                         WHERE g.scope_id = ?1 AND g.process_id = ?2
519                         LIMIT 1",
520                        params![session_scope_id, process_id],
521                        |_| Ok(()),
522                    )
523                    .optional()?
524                    .is_some();
525                Ok(exists)
526            })
527            .await
528            .map_err(process_sqlite_error)
529    }
530
531    async fn handle_grants_for_process(
532        &self,
533        process_id: &str,
534    ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
535        let process_id = process_id.to_string();
536        self.conn
537            .call(move |conn| {
538                Ok((|| {
539                    if Self::load_process_conn(conn, &process_id)?.is_none() {
540                        return Err(lash_core::PluginError::Session(format!(
541                            "unknown process `{process_id}`"
542                        )));
543                    }
544                    let mut stmt = conn
545                        .prepare(
546                            "SELECT session_id, descriptor_json
547                             FROM process_handle_grants
548                             WHERE process_id = ?1
549                             ORDER BY session_id ASC, scope_id ASC",
550                        )
551                        .map_err(process_sqlite_error)?;
552                    let rows = stmt
553                        .query_map(params![process_id], |row| {
554                            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
555                        })
556                        .map_err(process_sqlite_error)?;
557                    let mut grants = Vec::new();
558                    for row in rows {
559                        let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
560                        let descriptor: ProcessHandleDescriptor =
561                            serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
562                        grants.push(ProcessHandleGrant {
563                            session_id,
564                            process_id: process_id.clone(),
565                            descriptor,
566                        });
567                    }
568                    Ok(grants)
569                })())
570            })
571            .await
572            .map_err(process_sqlite_error)?
573    }
574
575    async fn delete_session_process_state(
576        &self,
577        session_id: &str,
578    ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
579        let session_id_owned = session_id.to_string();
580        let (
581            revoked_handle_count,
582            deleted_wake_count,
583            mut orphaned_process_ids,
584            mut preserved_process_ids,
585        ) = self
586            .conn
587            .write_flow(move |tx| {
588                Ok(tx_outcome((|| {
589                    let session_id = session_id_owned;
590                    let removed = {
591                        let mut stmt = tx
592                            .prepare(
593                                "SELECT g.process_id, p.record_json
594                                 FROM process_handle_grants g
595                                 JOIN processes p ON p.process_id = g.process_id
596                                 WHERE g.session_id = ?1
597                                 ORDER BY g.process_id ASC",
598                            )
599                            .map_err(process_sqlite_error)?;
600                        let rows = stmt
601                            .query_map(params![session_id], |row| {
602                                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
603                            })
604                            .map_err(process_sqlite_error)?;
605                        let mut removed = Vec::new();
606                        for row in rows {
607                            let (process_id, record_json) = row.map_err(process_sqlite_error)?;
608                            let record: ProcessRecord =
609                                serde_json::from_str(&record_json).map_err(process_decode_error)?;
610                            removed.push((process_id, record));
611                        }
612                        removed
613                    };
614
615                    // Wake acknowledgements are process-scoped consumed-event markers.
616                    // Session deletion removes materialized session-addressed deliveries
617                    // through the session store; clearing these rows would re-expose
618                    // already-consumed wakes to surviving grants or future host readers.
619                    let deleted_wake_count = 0;
620                    let revoked_handle_count = tx
621                        .execute(
622                            "DELETE FROM process_handle_grants WHERE session_id = ?1",
623                            params![session_id],
624                        )
625                        .map_err(process_sqlite_error)?;
626                    let mut orphaned_process_ids = Vec::new();
627                    let mut preserved_process_ids = Vec::new();
628                    for (process_id, record) in removed {
629                        if record.is_terminal() {
630                            continue;
631                        }
632                        let remaining_grants: i64 = tx
633                            .query_row(
634                                "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
635                                params![process_id],
636                                |row| row.get(0),
637                            )
638                            .map_err(process_sqlite_error)?;
639                        if remaining_grants == 0 {
640                            orphaned_process_ids.push(process_id);
641                        } else {
642                            preserved_process_ids.push(process_id);
643                        }
644                    }
645                    let wake_targeted = {
646                        let mut stmt = tx
647                            .prepare("SELECT process_id, record_json FROM processes ORDER BY process_id ASC")
648                            .map_err(process_sqlite_error)?;
649                        let rows = stmt
650                            .query_map([], |row| {
651                                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
652                            })
653                            .map_err(process_sqlite_error)?;
654                        let mut records = Vec::new();
655                        for row in rows {
656                            let (process_id, record_json) = row.map_err(process_sqlite_error)?;
657                            let record: ProcessRecord =
658                                serde_json::from_str(&record_json).map_err(process_decode_error)?;
659                            records.push((process_id, record));
660                        }
661                        records
662                    };
663                    for (_process_id, mut record) in wake_targeted {
664                        if record.clear_wake_target_for_session(&session_id) {
665                            Self::save_process_conn(tx, &record)?;
666                        }
667                    }
668                    Ok((
669                        revoked_handle_count,
670                        deleted_wake_count,
671                        orphaned_process_ids,
672                        preserved_process_ids,
673                    ))
674                })()))
675            })
676            .await
677            .map_err(process_sqlite_error)??;
678        orphaned_process_ids.sort();
679        orphaned_process_ids.dedup();
680        preserved_process_ids.sort();
681        preserved_process_ids.dedup();
682        Ok(lash_core::ProcessSessionDeleteReport {
683            session_id: session_id.to_string(),
684            revoked_handle_count,
685            deleted_wake_count,
686            orphaned_process_ids,
687            preserved_process_ids,
688        })
689    }
690
691    async fn append_event(
692        &self,
693        process_id: &str,
694        request: ProcessEventAppendRequest,
695    ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
696        let process_id = process_id.to_string();
697        let (result, _appended) = self
698            .conn
699            .write_flow(move |tx| {
700                Ok(tx_outcome((|| {
701                    let mut record =
702                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
703                            lash_core::PluginError::Session(format!(
704                                "unknown process `{process_id}`"
705                            ))
706                        })?;
707                    let replay_lookup = if let Some(replay_key) =
708                        request.replay.as_ref().map(|replay| replay.key.as_str())
709                    {
710                        Self::load_event_by_key_conn(tx, &process_id, replay_key)?
711                    } else {
712                        None
713                    };
714                    let sequence = tx
715                        .query_row(
716                            "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
717                            params![process_id],
718                            |row| row.get::<_, i64>(0),
719                        )
720                        .map_err(process_sqlite_error)? as u64;
721                    let occurred_at_ms = current_epoch_ms();
722                    let prepared = prepare_process_event_append(
723                        &record,
724                        request,
725                        sequence,
726                        replay_lookup,
727                        occurred_at_ms,
728                    )?;
729                    match prepared {
730                        lash_core::ProcessEventAppendPlan::Replay {
731                            event,
732                            repair_status,
733                            wake_delivery,
734                            occurred_at_ms,
735                        } => {
736                            let repaired = if let Some(status) = repair_status {
737                                lash_core::apply_process_status_projection(
738                                    &mut record,
739                                    status,
740                                    occurred_at_ms,
741                                );
742                                Self::save_process_conn(tx, &record)?;
743                                true
744                            } else {
745                                false
746                            };
747                            Ok((
748                                ProcessEventAppendResult {
749                                    event,
750                                    wake_delivery,
751                                },
752                                repaired,
753                            ))
754                        }
755                        lash_core::ProcessEventAppendPlan::Insert {
756                            event,
757                            payload_hash,
758                            status_update,
759                            wake_delivery,
760                            occurred_at_ms,
761                        } => {
762                            tx.execute(
763                                "INSERT INTO process_events (
764                                    process_id, sequence, event_type, payload_hash, idempotency_key,
765                                    occurred_at_ms, event_json
766                                 )
767                                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
768                                params![
769                                    process_id,
770                                    sequence as i64,
771                                    event.event_type.as_str(),
772                                    payload_hash.as_str(),
773                                    event.invocation.replay_key(),
774                                    occurred_at_ms as i64,
775                                    process_encode_json(&event)?,
776                                ],
777                            )
778                            .map_err(process_sqlite_error)?;
779                            if let Some(status) = status_update {
780                                lash_core::apply_process_status_projection(
781                                    &mut record,
782                                    status,
783                                    occurred_at_ms,
784                                );
785                            } else {
786                                record.updated_at_ms = occurred_at_ms;
787                            }
788                            Self::save_process_conn(tx, &record)?;
789                            Ok((
790                                ProcessEventAppendResult {
791                                    event,
792                                    wake_delivery,
793                                },
794                                true,
795                            ))
796                        }
797                    }
798                })()))
799            })
800            .await
801            .map_err(process_sqlite_error)??;
802        Ok(result)
803    }
804
805    async fn events_after(
806        &self,
807        process_id: &str,
808        after_sequence: u64,
809    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
810        let process_id = process_id.to_string();
811        self.conn
812            .call(move |conn| {
813                Ok((|| {
814                    if Self::load_process_conn(conn, &process_id)?.is_none() {
815                        return Err(lash_core::PluginError::Session(format!(
816                            "unknown process `{process_id}`"
817                        )));
818                    }
819                    let mut stmt = conn
820                        .prepare(
821                            "SELECT event_json FROM process_events
822                             WHERE process_id = ?1 AND sequence > ?2
823                             ORDER BY sequence ASC",
824                        )
825                        .map_err(process_sqlite_error)?;
826                    let rows = stmt
827                        .query_map(params![process_id, after_sequence as i64], |row| {
828                            row.get::<_, String>(0)
829                        })
830                        .map_err(process_sqlite_error)?;
831                    let mut events = Vec::new();
832                    for row in rows {
833                        events.push(
834                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
835                                .map_err(process_decode_error)?,
836                        );
837                    }
838                    Ok(events)
839                })())
840            })
841            .await
842            .map_err(process_sqlite_error)?
843    }
844
845    async fn count_events_through(
846        &self,
847        process_id: &str,
848        event_type: &str,
849        up_to_sequence: u64,
850    ) -> Result<u64, lash_core::PluginError> {
851        let process_id = process_id.to_string();
852        let event_type = event_type.to_string();
853        self.conn
854            .call(move |conn| {
855                Ok((|| {
856                    if Self::load_process_conn(conn, &process_id)?.is_none() {
857                        return Err(lash_core::PluginError::Session(format!(
858                            "unknown process `{process_id}`"
859                        )));
860                    }
861                    conn.query_row(
862                        "SELECT COUNT(*) FROM process_events
863                         WHERE process_id = ?1 AND event_type = ?2 AND sequence <= ?3",
864                        params![process_id, event_type, up_to_sequence as i64],
865                        |row| row.get::<_, i64>(0),
866                    )
867                    .map(|count| count as u64)
868                    .map_err(process_sqlite_error)
869                })())
870            })
871            .await
872            .map_err(process_sqlite_error)?
873    }
874
875    async fn recent_events(
876        &self,
877        process_id: &str,
878        limit: usize,
879    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
880        let process_id = process_id.to_string();
881        self.conn
882            .call(move |conn| {
883                Ok((|| {
884                    if Self::load_process_conn(conn, &process_id)?.is_none() {
885                        return Err(lash_core::PluginError::Session(format!(
886                            "unknown process `{process_id}`"
887                        )));
888                    }
889                    let mut stmt = conn
890                        .prepare(
891                            "SELECT event_json FROM process_events
892                             WHERE process_id = ?1
893                             ORDER BY sequence DESC
894                             LIMIT ?2",
895                        )
896                        .map_err(process_sqlite_error)?;
897                    let rows = stmt
898                        .query_map(params![process_id, limit as i64], |row| {
899                            row.get::<_, String>(0)
900                        })
901                        .map_err(process_sqlite_error)?;
902                    let mut events: Vec<ProcessEvent> = Vec::new();
903                    for row in rows {
904                        events.push(
905                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
906                                .map_err(process_decode_error)?,
907                        );
908                    }
909                    events.reverse();
910                    Ok(events)
911                })())
912            })
913            .await
914            .map_err(process_sqlite_error)?
915    }
916
917    async fn wake_events_after(
918        &self,
919        process_id: &str,
920        after_sequence: u64,
921    ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
922        let acked: std::collections::HashSet<u64> = {
923            let process_id = process_id.to_string();
924            self.conn
925                .call(move |conn| {
926                    Ok(
927                        (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
928                            let mut stmt = conn
929                                .prepare(
930                                    "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
931                                )
932                                .map_err(process_sqlite_error)?;
933                            let rows = stmt
934                                .query_map(params![process_id], |row| row.get::<_, i64>(0))
935                                .map_err(process_sqlite_error)?;
936                            let mut set = std::collections::HashSet::new();
937                            for row in rows {
938                                set.insert(row.map_err(process_sqlite_error)? as u64);
939                            }
940                            Ok(set)
941                        })(),
942                    )
943                })
944                .await
945                .map_err(process_sqlite_error)??
946        };
947        Ok(self
948            .events_after(process_id, after_sequence)
949            .await?
950            .into_iter()
951            .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
952            .collect())
953    }
954
955    async fn complete_process(
956        &self,
957        process_id: &str,
958        await_output: ProcessAwaitOutput,
959    ) -> Result<ProcessRecord, lash_core::PluginError> {
960        let event_type = match await_output.terminal_state() {
961            lash_core::ProcessTerminalState::Completed => "process.completed",
962            lash_core::ProcessTerminalState::Failed => "process.failed",
963            lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
964            lash_core::ProcessTerminalState::Abandoned => "process.abandoned",
965        };
966        self.append_event(
967            process_id,
968            ProcessEventAppendRequest::new(
969                event_type,
970                serde_json::json!({ "await_output": await_output }),
971            )
972            .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
973        )
974        .await?;
975        self.get_process(process_id).await.ok_or_else(|| {
976            lash_core::PluginError::Session(format!(
977                "unknown process `{process_id}` after terminal event"
978            ))
979        })
980    }
981
982    async fn record_first_started(
983        &self,
984        process_id: &str,
985        started: ProcessStarted,
986    ) -> Result<ProcessRecord, lash_core::PluginError> {
987        let process_id = process_id.to_string();
988        self.conn
989            .write_flow(move |tx| {
990                Ok(tx_outcome((|| {
991                    let mut record =
992                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
993                            lash_core::PluginError::Session(format!(
994                                "unknown process `{process_id}`"
995                            ))
996                        })?;
997                    // First-writer-wins: the started fact is immutable once written.
998                    if record.first_started.is_none() {
999                        record.first_started = Some(Box::new(started));
1000                        record.updated_at_ms = current_epoch_ms();
1001                        Self::save_process_conn(tx, &record)?;
1002                    }
1003                    Ok(record)
1004                })()))
1005            })
1006            .await
1007            .map_err(process_sqlite_error)?
1008    }
1009
1010    async fn request_process_abandon(
1011        &self,
1012        process_id: &str,
1013        request: AbandonRequest,
1014    ) -> Result<ProcessRecord, lash_core::PluginError> {
1015        let process_id = process_id.to_string();
1016        self.conn
1017            .write_flow(move |tx| {
1018                Ok(tx_outcome((|| {
1019                    let mut record =
1020                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
1021                            lash_core::PluginError::Session(format!(
1022                                "unknown process `{process_id}`"
1023                            ))
1024                        })?;
1025                    if record.is_terminal() {
1026                        return Err(lash_core::PluginError::Session(format!(
1027                            "terminal process `{process_id}` cannot accept an abandon request"
1028                        )));
1029                    }
1030                    // First-writer-wins: preserve the original recorded authorization.
1031                    if record.abandon_request.is_none() {
1032                        record.abandon_request = Some(Box::new(request));
1033                        record.updated_at_ms = current_epoch_ms();
1034                        Self::save_process_conn(tx, &record)?;
1035                    }
1036                    Ok(record)
1037                })()))
1038            })
1039            .await
1040            .map_err(process_sqlite_error)?
1041    }
1042
1043    async fn set_process_wait(
1044        &self,
1045        process_id: &str,
1046        wait: lash_core::WaitState,
1047    ) -> Result<ProcessRecord, lash_core::PluginError> {
1048        let process_id = process_id.to_string();
1049        self.conn
1050            .write_flow(move |tx| {
1051                Ok(tx_outcome((|| {
1052                    let mut record =
1053                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
1054                            lash_core::PluginError::Session(format!(
1055                                "unknown process `{process_id}`"
1056                            ))
1057                        })?;
1058                    if record.is_terminal() {
1059                        return Err(lash_core::PluginError::Session(format!(
1060                            "terminal process `{process_id}` cannot enter a wait state"
1061                        )));
1062                    }
1063                    record.wait = Some(wait);
1064                    record.updated_at_ms = current_epoch_ms();
1065                    Self::save_process_conn(tx, &record)?;
1066                    Ok(record)
1067                })()))
1068            })
1069            .await
1070            .map_err(process_sqlite_error)?
1071    }
1072
1073    async fn clear_process_wait(
1074        &self,
1075        process_id: &str,
1076    ) -> Result<ProcessRecord, lash_core::PluginError> {
1077        let process_id = process_id.to_string();
1078        self.conn
1079            .write_flow(move |tx| {
1080                Ok(tx_outcome((|| {
1081                    let mut record =
1082                        Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
1083                            lash_core::PluginError::Session(format!(
1084                                "unknown process `{process_id}`"
1085                            ))
1086                        })?;
1087                    record.wait = None;
1088                    record.updated_at_ms = current_epoch_ms();
1089                    Self::save_process_conn(tx, &record)?;
1090                    Ok(record)
1091                })()))
1092            })
1093            .await
1094            .map_err(process_sqlite_error)?
1095    }
1096
1097    async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
1098        let process_id = process_id.to_string();
1099        self.conn
1100            .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
1101            .await
1102            .ok()
1103            .flatten()
1104    }
1105
1106    async fn list_processes(
1107        &self,
1108        filter: &lash_core::ProcessListFilter,
1109    ) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1110        let filter = filter.clone();
1111        self.conn
1112            .call(move |conn| {
1113                Ok((|| {
1114                    let mut stmt = conn
1115                        .prepare(
1116                            "SELECT record_json FROM processes
1117                             ORDER BY process_id ASC",
1118                        )
1119                        .map_err(process_sqlite_error)?;
1120                    let rows = stmt
1121                        .query_map([], |row| row.get::<_, String>(0))
1122                        .map_err(process_sqlite_error)?;
1123                    let mut records = Vec::new();
1124                    for row in rows {
1125                        let record: ProcessRecord =
1126                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
1127                                .map_err(process_decode_error)?;
1128                        if filter.matches_record(&record) {
1129                            records.push(record);
1130                        }
1131                    }
1132                    Ok(records)
1133                })())
1134            })
1135            .await
1136            .map_err(process_sqlite_error)?
1137    }
1138
1139    async fn ack_wake(
1140        &self,
1141        process_id: &str,
1142        sequence: u64,
1143    ) -> Result<(), lash_core::PluginError> {
1144        let process_id = process_id.to_string();
1145        self.conn
1146            .call(move |conn| {
1147                Ok((|| {
1148                    if Self::load_process_conn(conn, &process_id)?.is_none() {
1149                        return Err(lash_core::PluginError::Session(format!(
1150                            "unknown process `{process_id}`"
1151                        )));
1152                    }
1153                    conn.execute(
1154                        "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
1155                        params![process_id, sequence as i64],
1156                    )
1157                    .map_err(process_sqlite_error)?;
1158                    Ok(())
1159                })())
1160            })
1161            .await
1162            .map_err(process_sqlite_error)?
1163    }
1164
1165    async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1166        self.conn
1167            .call(move |conn| {
1168                Ok((|| {
1169                    let mut stmt = conn
1170                        .prepare(
1171                            "SELECT record_json FROM processes
1172                             WHERE status = 'running'
1173                             ORDER BY process_id ASC",
1174                        )
1175                        .map_err(process_sqlite_error)?;
1176                    let rows = stmt
1177                        .query_map([], |row| row.get::<_, String>(0))
1178                        .map_err(process_sqlite_error)?;
1179                    let mut records = Vec::new();
1180                    for row in rows {
1181                        let record: ProcessRecord =
1182                            serde_json::from_str(&row.map_err(process_sqlite_error)?)
1183                                .map_err(process_decode_error)?;
1184                        records.push(record);
1185                    }
1186                    Ok(records)
1187                })())
1188            })
1189            .await
1190            .map_err(process_sqlite_error)?
1191    }
1192
1193    async fn claim_process_lease(
1194        &self,
1195        process_id: &str,
1196        owner: &LeaseOwnerIdentity,
1197        lease_ttl_ms: u64,
1198    ) -> Result<ProcessLeaseClaimOutcome, lash_core::PluginError> {
1199        let process_id = process_id.to_string();
1200        let owner = owner.clone();
1201        self.conn
1202            .write_flow(move |tx| {
1203                Ok(tx_outcome((|| {
1204                    if Self::load_process_conn(tx, &process_id)?.is_none() {
1205                        return Err(lash_core::PluginError::Session(format!(
1206                            "unknown process `{process_id}`"
1207                        )));
1208                    }
1209                    let now = current_epoch_ms();
1210                    let current = Self::load_process_lease_conn(tx, &process_id)?;
1211                    if let Some(current) = current.as_ref()
1212                        && current.expires_at_epoch_ms > now
1213                    {
1214                        if current.owner.same_incarnation(&owner) {
1215                            // Same incarnation re-enters its own live lease:
1216                            // extend the expiry, keep token and fencing token.
1217                            let expires_at = now.saturating_add(lease_ttl_ms);
1218                            tx.execute(
1219                                "UPDATE process_leases
1220                                 SET lease_expires_at_ms = ?2
1221                                 WHERE process_id = ?1",
1222                                params![process_id, expires_at as i64],
1223                            )
1224                            .map_err(process_sqlite_error)?;
1225                            return Ok(ProcessLeaseClaimOutcome::Acquired(ProcessLease {
1226                                expires_at_epoch_ms: expires_at,
1227                                ..current.clone()
1228                            }));
1229                        }
1230                        return Ok(ProcessLeaseClaimOutcome::Busy {
1231                            holder: current.clone(),
1232                        });
1233                    }
1234                    // Read the raw fencing token directly: a completed/abandoned
1235                    // lease nulls the owner/token columns but retains the
1236                    // monotonically-increasing `lease_fencing_token`, so a
1237                    // re-claim never reuses a stale writer's token.
1238                    let fencing_token: u64 = tx
1239                        .query_row(
1240                            "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1241                            params![process_id],
1242                            |row| row.get::<_, i64>(0),
1243                        )
1244                        .optional()
1245                        .map_err(process_sqlite_error)?
1246                        .unwrap_or(0) as u64
1247                        + 1;
1248                    Ok(ProcessLeaseClaimOutcome::Acquired(
1249                        Self::acquire_process_lease_conn(
1250                            tx,
1251                            &process_id,
1252                            &owner,
1253                            fencing_token,
1254                            now,
1255                            lease_ttl_ms,
1256                        )?,
1257                    ))
1258                })()))
1259            })
1260            .await
1261            .map_err(process_sqlite_error)?
1262    }
1263
1264    async fn reclaim_process_lease(
1265        &self,
1266        process_id: &str,
1267        owner: &LeaseOwnerIdentity,
1268        observed_holder: &ProcessLease,
1269        lease_ttl_ms: u64,
1270    ) -> Result<ProcessLeaseClaimOutcome, lash_core::PluginError> {
1271        let process_id = process_id.to_string();
1272        let owner = owner.clone();
1273        let observed_holder = observed_holder.clone();
1274        self.conn
1275            .write_flow(move |tx| {
1276                Ok(tx_outcome((|| {
1277                    if Self::load_process_conn(tx, &process_id)?.is_none() {
1278                        return Err(lash_core::PluginError::Session(format!(
1279                            "unknown process `{process_id}`"
1280                        )));
1281                    }
1282                    let now = current_epoch_ms();
1283                    let current = Self::load_process_lease_conn(tx, &process_id)?;
1284                    let Some(current) = current else {
1285                        // Free (or released) lease: acquire on the retained
1286                        // fencing token like a plain claim would.
1287                        let fencing_token: u64 = tx
1288                            .query_row(
1289                                "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1290                                params![process_id],
1291                                |row| row.get::<_, i64>(0),
1292                            )
1293                            .optional()
1294                            .map_err(process_sqlite_error)?
1295                            .unwrap_or(0) as u64
1296                            + 1;
1297                        return Ok(ProcessLeaseClaimOutcome::Acquired(
1298                            Self::acquire_process_lease_conn(
1299                                tx,
1300                                &process_id,
1301                                &owner,
1302                                fencing_token,
1303                                now,
1304                                lease_ttl_ms,
1305                            )?,
1306                        ));
1307                    };
1308                    if current.expires_at_epoch_ms <= now {
1309                        return Ok(ProcessLeaseClaimOutcome::Acquired(
1310                            Self::acquire_process_lease_conn(
1311                                tx,
1312                                &process_id,
1313                                &owner,
1314                                current.fencing_token.saturating_add(1),
1315                                now,
1316                                lease_ttl_ms,
1317                            )?,
1318                        ));
1319                    }
1320                    // Fenced CAS on the observed holder: identity, token, and
1321                    // fencing token must all still match, and the holder must
1322                    // be definitely dead for this claimant.
1323                    if observed_holder.process_id == process_id
1324                        && current.owner.same_incarnation(&observed_holder.owner)
1325                        && current.lease_token == observed_holder.lease_token
1326                        && current.fencing_token == observed_holder.fencing_token
1327                        && current.owner.is_definitely_dead_for_claimant(&owner)
1328                    {
1329                        let fencing_token = current.fencing_token.saturating_add(1);
1330                        let lease = ProcessLease {
1331                            schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1332                            process_id: process_id.clone(),
1333                            owner: owner.clone(),
1334                            lease_token: format!(
1335                                "{:x}",
1336                                Sha256::digest(
1337                                    format!(
1338                                        "{process_id}:{}:{}:{now}:{fencing_token}",
1339                                        owner.owner_id, owner.incarnation_id
1340                                    )
1341                                    .as_bytes()
1342                                )
1343                            ),
1344                            fencing_token,
1345                            claimed_at_epoch_ms: now,
1346                            expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1347                        };
1348                        let changed = tx
1349                            .execute(
1350                                "UPDATE process_leases
1351                                 SET lease_owner_id = ?1,
1352                                     lease_owner_incarnation_id = ?2,
1353                                     lease_owner_liveness_json = ?3,
1354                                     lease_token = ?4,
1355                                     lease_fencing_token = ?5,
1356                                     lease_claimed_at_ms = ?6,
1357                                     lease_expires_at_ms = ?7
1358                                 WHERE process_id = ?8
1359                                   AND lease_owner_id = ?9
1360                                   AND lease_owner_incarnation_id = ?10
1361                                   AND lease_token = ?11
1362                                   AND lease_fencing_token = ?12",
1363                                params![
1364                                    lease.owner.owner_id,
1365                                    lease.owner.incarnation_id,
1366                                    encode_process_lease_liveness(&lease.owner.liveness)?,
1367                                    lease.lease_token,
1368                                    lease.fencing_token as i64,
1369                                    lease.claimed_at_epoch_ms as i64,
1370                                    lease.expires_at_epoch_ms as i64,
1371                                    process_id,
1372                                    observed_holder.owner.owner_id,
1373                                    observed_holder.owner.incarnation_id,
1374                                    observed_holder.lease_token,
1375                                    observed_holder.fencing_token as i64,
1376                                ],
1377                            )
1378                            .map_err(process_sqlite_error)?;
1379                        if changed == 1 {
1380                            return Ok(ProcessLeaseClaimOutcome::Acquired(lease));
1381                        }
1382                        // Lost the CAS race: re-read and report the winner.
1383                        if let Some(current) = Self::load_process_lease_conn(tx, &process_id)?
1384                            && current.expires_at_epoch_ms > now
1385                        {
1386                            return Ok(ProcessLeaseClaimOutcome::Busy { holder: current });
1387                        }
1388                        return Err(process_lease_expired(&process_id));
1389                    }
1390                    Ok(ProcessLeaseClaimOutcome::Busy { holder: current })
1391                })()))
1392            })
1393            .await
1394            .map_err(process_sqlite_error)?
1395    }
1396
1397    async fn renew_process_lease(
1398        &self,
1399        lease: &ProcessLease,
1400        lease_ttl_ms: u64,
1401    ) -> Result<ProcessLease, lash_core::PluginError> {
1402        let lease = lease.clone();
1403        self.conn
1404            .write_flow(move |tx| {
1405                Ok(tx_outcome((|| {
1406                    let now = current_epoch_ms();
1407                    let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
1408                    if !guard_lease(current.as_ref(), &lease.lease_token, now)
1409                        || !current.as_ref().is_some_and(|current| {
1410                            current.owner.same_incarnation(&lease.owner)
1411                                && current.fencing_token == lease.fencing_token
1412                        })
1413                    {
1414                        return Err(process_lease_expired(&lease.process_id));
1415                    }
1416                    let renewed = ProcessLease {
1417                        expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1418                        ..lease.clone()
1419                    };
1420                    tx.execute(
1421                        "UPDATE process_leases
1422                         SET lease_expires_at_ms = ?2
1423                         WHERE process_id = ?1 AND lease_token = ?3",
1424                        params![
1425                            renewed.process_id.as_str(),
1426                            renewed.expires_at_epoch_ms as i64,
1427                            renewed.lease_token.as_str(),
1428                        ],
1429                    )
1430                    .map_err(process_sqlite_error)?;
1431                    Ok(renewed)
1432                })()))
1433            })
1434            .await
1435            .map_err(process_sqlite_error)?
1436    }
1437
1438    async fn get_process_lease(
1439        &self,
1440        process_id: &str,
1441    ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
1442        let process_id = process_id.to_string();
1443        self.conn
1444            .call(move |conn| Ok(Self::load_process_lease_conn(conn, &process_id)))
1445            .await
1446            .map_err(process_sqlite_error)?
1447    }
1448
1449    async fn complete_process_lease(
1450        &self,
1451        completion: &ProcessLeaseCompletion,
1452    ) -> Result<(), lash_core::PluginError> {
1453        let process_id = completion.process_id.clone();
1454        let lease_token = completion.lease_token.clone();
1455        self.conn
1456            .call(move |conn| {
1457                conn.execute(
1458                    "UPDATE process_leases
1459                     SET lease_owner_id = NULL,
1460                         lease_token = NULL,
1461                         lease_claimed_at_ms = 0,
1462                         lease_expires_at_ms = 0
1463                     WHERE process_id = ?1 AND lease_token = ?2",
1464                    params![process_id, lease_token],
1465                )
1466            })
1467            .await
1468            .map_err(process_sqlite_error)?;
1469        Ok(())
1470    }
1471
1472    async fn prune_terminal_processes(
1473        &self,
1474        cutoff_epoch_ms: u64,
1475    ) -> Result<ProcessPruneReport, lash_core::PluginError> {
1476        let cutoff = cutoff_epoch_ms as i64;
1477        self.conn
1478            .write_flow(move |tx| {
1479                Ok(tx_outcome((|| {
1480                    // The child deletes select from `processes` (still present
1481                    // until the final delete) so they resolve the same terminal,
1482                    // older-than-cutoff set. Delete children first, then the
1483                    // parent rows.
1484                    const SELECTOR: &str = "process_id IN (
1485                        SELECT process_id FROM processes
1486                        WHERE status != 'running' AND updated_at_ms < ?1
1487                    )";
1488                    let pruned_events = tx
1489                        .execute(
1490                            &format!("DELETE FROM process_events WHERE {SELECTOR}"),
1491                            params![cutoff],
1492                        )
1493                        .map_err(process_sqlite_error)?;
1494                    tx.execute(
1495                        &format!("DELETE FROM process_wake_acks WHERE {SELECTOR}"),
1496                        params![cutoff],
1497                    )
1498                    .map_err(process_sqlite_error)?;
1499                    tx.execute(
1500                        &format!("DELETE FROM process_handle_grants WHERE {SELECTOR}"),
1501                        params![cutoff],
1502                    )
1503                    .map_err(process_sqlite_error)?;
1504                    tx.execute(
1505                        &format!("DELETE FROM process_leases WHERE {SELECTOR}"),
1506                        params![cutoff],
1507                    )
1508                    .map_err(process_sqlite_error)?;
1509                    let pruned_processes = tx
1510                        .execute(
1511                            "DELETE FROM processes
1512                             WHERE status != 'running' AND updated_at_ms < ?1",
1513                            params![cutoff],
1514                        )
1515                        .map_err(process_sqlite_error)?;
1516                    Ok(ProcessPruneReport {
1517                        pruned_processes,
1518                        pruned_events,
1519                    })
1520                })()))
1521            })
1522            .await
1523            .map_err(process_sqlite_error)?
1524    }
1525}
1526
1527/// Loud, stable error for a superseded or expired process lease.
1528fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1529    lash_core::PluginError::Session(format!(
1530        "process lease for `{process_id}` is missing or expired"
1531    ))
1532}
1533
1534fn process_lease_owner_from_columns(
1535    owner_id: String,
1536    incarnation_id: Option<String>,
1537    liveness_json: Option<String>,
1538) -> LeaseOwnerIdentity {
1539    LeaseOwnerIdentity {
1540        incarnation_id: incarnation_id.unwrap_or_else(|| owner_id.clone()),
1541        owner_id,
1542        liveness: liveness_json
1543            .as_deref()
1544            .and_then(|json| serde_json::from_str(json).ok())
1545            .unwrap_or(LeaseOwnerLiveness::Opaque),
1546    }
1547}
1548
1549fn encode_process_lease_liveness(
1550    liveness: &LeaseOwnerLiveness,
1551) -> Result<String, lash_core::PluginError> {
1552    serde_json::to_string(liveness).map_err(|err| {
1553        lash_core::PluginError::Session(format!("failed to encode process lease liveness: {err}"))
1554    })
1555}
1556
1557fn process_external_ref_conflict(
1558    process_id: &str,
1559    existing: &ProcessExternalRef,
1560    new: &ProcessExternalRef,
1561) -> lash_core::PluginError {
1562    lash_core::PluginError::Session(format!(
1563        "process `{process_id}` external ref conflict: existing {existing:?}, new {new:?}"
1564    ))
1565}