1use super::*;
26
27fn process_status_label(record: &ProcessRecord) -> &'static str {
28 record.status.label()
29}
30
31impl SqliteProcessRegistry {
32 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
33 let conn = SqliteConnection::open(path).await?;
34 ensure_process_schema(&conn).await?;
35 apply_pragmas(&conn, StoreBacking::File).await?;
36 Ok(Self {
37 conn,
38 notify: tokio::sync::Notify::new(),
39 })
40 }
41
42 pub async fn memory() -> tokio_rusqlite::Result<Self> {
43 let conn = SqliteConnection::open_in_memory().await?;
44 ensure_process_schema(&conn).await?;
45 apply_pragmas(&conn, StoreBacking::Memory).await?;
46 Ok(Self {
47 conn,
48 notify: tokio::sync::Notify::new(),
49 })
50 }
51
52 fn load_process_conn(
53 conn: &Connection,
54 process_id: &str,
55 ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
56 let json: Option<String> = conn
57 .query_row(
58 "SELECT record_json FROM processes WHERE process_id = ?1",
59 params![process_id],
60 |row| row.get(0),
61 )
62 .optional()
63 .map_err(process_sqlite_error)?;
64 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
65 .transpose()
66 }
67
68 fn save_process_conn(
69 conn: &Connection,
70 record: &ProcessRecord,
71 ) -> Result<(), lash_core::PluginError> {
72 conn.execute(
73 "UPDATE processes
74 SET updated_at_ms = ?2, status = ?3, record_json = ?4
75 WHERE process_id = ?1",
76 params![
77 record.id.as_str(),
78 record.updated_at_ms as i64,
79 process_status_label(record),
80 process_encode_json(record)?
81 ],
82 )
83 .map_err(process_sqlite_error)?;
84 Ok(())
85 }
86
87 fn load_event_by_key_conn(
88 conn: &Connection,
89 process_id: &str,
90 replay_key: &str,
91 ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
92 let row: Option<(String, String)> = conn
93 .query_row(
94 "SELECT payload_hash, event_json
95 FROM process_events
96 WHERE process_id = ?1 AND idempotency_key = ?2",
97 params![process_id, replay_key],
98 |row| Ok((row.get(0)?, row.get(1)?)),
99 )
100 .optional()
101 .map_err(process_sqlite_error)?;
102 row.map(|(hash, json)| {
103 serde_json::from_str(&json)
104 .map(|event| (hash, event))
105 .map_err(process_decode_error)
106 })
107 .transpose()
108 }
109
110 fn load_process_lease_conn(
111 conn: &Connection,
112 process_id: &str,
113 ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
114 conn.query_row(
115 "SELECT lease_owner_id, lease_token, lease_fencing_token,
116 lease_claimed_at_ms, lease_expires_at_ms
117 FROM process_leases
118 WHERE process_id = ?1",
119 params![process_id],
120 |row| {
121 let owner_id: Option<String> = row.get(0)?;
122 let lease_token: Option<String> = row.get(1)?;
123 let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
124 return Ok(None);
125 };
126 Ok(Some(ProcessLease {
127 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
128 process_id: process_id.to_string(),
129 owner_id,
130 lease_token,
131 fencing_token: row.get::<_, i64>(2)? as u64,
132 claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
133 expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
134 }))
135 },
136 )
137 .optional()
138 .map(|lease| lease.flatten())
139 .map_err(process_sqlite_error)
140 }
141
142 fn list_grants_for_scope_conn(
143 conn: &Connection,
144 session_scope: &SessionScope,
145 live_only: bool,
146 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
147 let session_scope_id = session_scope.id();
148 let status_clause = if live_only {
149 "AND p.status = 'running'"
150 } else {
151 ""
152 };
153 let mut stmt = conn
154 .prepare(&format!(
155 "SELECT g.process_id, g.descriptor_json, p.record_json
156 FROM process_handle_grants g
157 JOIN processes p ON p.process_id = g.process_id
158 WHERE g.scope_id = ?1 {status_clause}
159 ORDER BY g.process_id ASC"
160 ))
161 .map_err(process_sqlite_error)?;
162 let rows = stmt
163 .query_map(params![session_scope_id.as_str()], |row| {
164 Ok((
165 row.get::<_, String>(0)?,
166 row.get::<_, String>(1)?,
167 row.get::<_, String>(2)?,
168 ))
169 })
170 .map_err(process_sqlite_error)?;
171 let mut entries = Vec::new();
172 for row in rows {
173 let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
174 let descriptor: ProcessHandleDescriptor =
175 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
176 let record: ProcessRecord =
177 serde_json::from_str(&record_json).map_err(process_decode_error)?;
178 entries.push((
179 ProcessHandleGrant {
180 session_id: session_scope.session_id.clone(),
181 process_id,
182 descriptor,
183 },
184 record,
185 ));
186 }
187 Ok(entries)
188 }
189}
190
191fn tx_outcome<T>(
196 result: Result<T, lash_core::PluginError>,
197) -> TxOutcome<Result<T, lash_core::PluginError>> {
198 match result {
199 Ok(value) => TxOutcome::Commit(Ok(value)),
200 Err(err) => TxOutcome::Rollback(Err(err)),
201 }
202}
203
204#[async_trait::async_trait]
205impl ProcessRegistry for SqliteProcessRegistry {
206 fn durability_tier(&self) -> DurabilityTier {
207 DurabilityTier::Durable
208 }
209
210 async fn register_process(
211 &self,
212 registration: ProcessRegistration,
213 ) -> Result<ProcessRecord, lash_core::PluginError> {
214 let (registration, registration_hash) = prepare_process_registration(registration)?;
215 let record = self
216 .conn
217 .write_flow(move |tx| {
218 Ok(tx_outcome((|| {
219 if let Some(existing) = Self::load_process_conn(tx, ®istration.id)? {
220 if existing.registration_hash == registration_hash {
221 return Ok(existing);
222 }
223 return Err(lash_core::PluginError::Session(format!(
224 "process `{}` registration hash conflict: existing {}, new {}",
225 registration.id, existing.registration_hash, registration_hash
226 )));
227 }
228 let now = current_epoch_ms();
229 let record = ProcessRecord::from_prepared_registration(
230 registration,
231 registration_hash,
232 now,
233 );
234 let originator_scope_id = record.originator_scope_id();
235 tx.execute(
236 "INSERT INTO processes (
237 process_id, registration_hash, owner_scope_id,
238 created_at_ms, updated_at_ms, status, record_json
239 )
240 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
241 params![
242 record.id.as_str(),
243 record.registration_hash.as_str(),
244 originator_scope_id.as_str(),
245 record.created_at_ms as i64,
246 record.updated_at_ms as i64,
247 process_status_label(&record),
248 process_encode_json(&record)?,
249 ],
250 )
251 .map_err(process_sqlite_error)?;
252 Ok(record)
253 })()))
254 })
255 .await
256 .map_err(process_sqlite_error)??;
257 self.notify.notify_waiters();
258 Ok(record)
259 }
260
261 async fn set_external_ref(
262 &self,
263 process_id: &str,
264 external_ref: ProcessExternalRef,
265 ) -> Result<ProcessRecord, lash_core::PluginError> {
266 let process_id = process_id.to_string();
267 let (record, changed) = self
268 .conn
269 .write_flow(move |tx| {
270 Ok(tx_outcome((|| {
271 let mut record =
272 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
273 lash_core::PluginError::Session(format!(
274 "unknown process `{process_id}`"
275 ))
276 })?;
277 if let Some(existing) = &record.external_ref {
278 if existing == &external_ref {
279 return Ok((record, false));
280 }
281 return Err(process_external_ref_conflict(
282 &process_id,
283 existing,
284 &external_ref,
285 ));
286 }
287 record.external_ref = Some(external_ref);
288 record.updated_at_ms = current_epoch_ms();
289 Self::save_process_conn(tx, &record)?;
290 Ok((record, true))
291 })()))
292 })
293 .await
294 .map_err(process_sqlite_error)??;
295 if changed {
296 self.notify.notify_waiters();
297 }
298 Ok(record)
299 }
300
301 async fn grant_handle(
302 &self,
303 session_scope: &SessionScope,
304 process_id: &str,
305 descriptor: ProcessHandleDescriptor,
306 ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
307 let session_scope = session_scope.clone();
308 let process_id = process_id.to_string();
309 self.conn
310 .write_flow(move |tx| {
311 Ok(tx_outcome((|| {
312 let session_scope_id = session_scope.id();
313 if Self::load_process_conn(tx, &process_id)?.is_none() {
314 return Err(lash_core::PluginError::Session(format!(
315 "unknown process `{process_id}`"
316 )));
317 }
318 tx.execute(
319 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
320 VALUES (?1, ?2, ?3, ?4)
321 ON CONFLICT(scope_id, process_id) DO UPDATE SET
322 session_id = excluded.session_id,
323 descriptor_json = excluded.descriptor_json",
324 params![
325 session_scope.session_id.as_str(),
326 session_scope_id.as_str(),
327 process_id.as_str(),
328 process_encode_json(&descriptor)?
329 ],
330 )
331 .map_err(process_sqlite_error)?;
332 Ok(ProcessHandleGrant {
333 session_id: session_scope.session_id.clone(),
334 process_id: process_id.clone(),
335 descriptor,
336 })
337 })()))
338 })
339 .await
340 .map_err(process_sqlite_error)?
341 }
342
343 async fn revoke_handle(
344 &self,
345 session_scope: &SessionScope,
346 process_id: &str,
347 ) -> Result<(), lash_core::PluginError> {
348 let session_scope_id = session_scope.id().as_str().to_string();
349 let process_id = process_id.to_string();
350 self.conn
351 .call(move |conn| {
352 conn.execute(
353 "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
354 params![session_scope_id, process_id],
355 )
356 })
357 .await
358 .map_err(process_sqlite_error)?;
359 Ok(())
360 }
361
362 async fn transfer_handle_grants(
363 &self,
364 from_scope: &SessionScope,
365 to_scope: &SessionScope,
366 process_ids: &[String],
367 ) -> Result<(), lash_core::PluginError> {
368 let from_scope = from_scope.clone();
369 let to_scope = to_scope.clone();
370 let process_ids = process_ids.to_vec();
371 self.conn
372 .write_flow(move |tx| {
373 Ok(tx_outcome((|| {
374 let from_scope_id = from_scope.id();
375 let to_scope_id = to_scope.id();
376 for process_id in &process_ids {
377 let descriptor_json: Option<String> = tx
378 .query_row(
379 "SELECT descriptor_json
380 FROM process_handle_grants
381 WHERE scope_id = ?1 AND process_id = ?2",
382 params![from_scope_id.as_str(), process_id.as_str()],
383 |row| row.get(0),
384 )
385 .optional()
386 .map_err(process_sqlite_error)?;
387 let Some(descriptor_json) = descriptor_json else {
388 return Err(lash_core::PluginError::Session(format!(
389 "process handle `{process_id}` is not granted to session `{}`",
390 from_scope.session_id
391 )));
392 };
393 tx.execute(
394 "DELETE FROM process_handle_grants
395 WHERE scope_id = ?1 AND process_id = ?2",
396 params![from_scope_id.as_str(), process_id.as_str()],
397 )
398 .map_err(process_sqlite_error)?;
399 tx.execute(
400 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
401 VALUES (?1, ?2, ?3, ?4)
402 ON CONFLICT(scope_id, process_id) DO UPDATE SET
403 session_id = excluded.session_id,
404 descriptor_json = excluded.descriptor_json",
405 params![
406 to_scope.session_id.as_str(),
407 to_scope_id.as_str(),
408 process_id.as_str(),
409 descriptor_json
410 ],
411 )
412 .map_err(process_sqlite_error)?;
413 }
414 Ok(())
415 })()))
416 })
417 .await
418 .map_err(process_sqlite_error)?
419 }
420
421 async fn list_handle_grants(
422 &self,
423 session_scope: &SessionScope,
424 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
425 let session_scope = session_scope.clone();
426 self.conn
427 .call(move |conn| {
428 Ok(Self::list_grants_for_scope_conn(
429 conn,
430 &session_scope,
431 false,
432 ))
433 })
434 .await
435 .map_err(process_sqlite_error)?
436 }
437
438 async fn list_live_handle_grants(
439 &self,
440 session_scope: &SessionScope,
441 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
442 let session_scope = session_scope.clone();
443 self.conn
444 .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &session_scope, true)))
445 .await
446 .map_err(process_sqlite_error)?
447 }
448
449 async fn has_handle_grant(
450 &self,
451 session_scope: &SessionScope,
452 process_id: &str,
453 ) -> Result<bool, lash_core::PluginError> {
454 let session_scope_id = session_scope.id().as_str().to_string();
455 let process_id = process_id.to_string();
456 self.conn
457 .call(move |conn| {
458 let exists = conn
459 .query_row(
460 "SELECT 1
461 FROM process_handle_grants g
462 JOIN processes p ON p.process_id = g.process_id
463 WHERE g.scope_id = ?1 AND g.process_id = ?2
464 LIMIT 1",
465 params![session_scope_id, process_id],
466 |_| Ok(()),
467 )
468 .optional()?
469 .is_some();
470 Ok(exists)
471 })
472 .await
473 .map_err(process_sqlite_error)
474 }
475
476 async fn handle_grants_for_process(
477 &self,
478 process_id: &str,
479 ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
480 let process_id = process_id.to_string();
481 self.conn
482 .call(move |conn| {
483 Ok((|| {
484 if Self::load_process_conn(conn, &process_id)?.is_none() {
485 return Err(lash_core::PluginError::Session(format!(
486 "unknown process `{process_id}`"
487 )));
488 }
489 let mut stmt = conn
490 .prepare(
491 "SELECT session_id, descriptor_json
492 FROM process_handle_grants
493 WHERE process_id = ?1
494 ORDER BY session_id ASC, scope_id ASC",
495 )
496 .map_err(process_sqlite_error)?;
497 let rows = stmt
498 .query_map(params![process_id], |row| {
499 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
500 })
501 .map_err(process_sqlite_error)?;
502 let mut grants = Vec::new();
503 for row in rows {
504 let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
505 let descriptor: ProcessHandleDescriptor =
506 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
507 grants.push(ProcessHandleGrant {
508 session_id,
509 process_id: process_id.clone(),
510 descriptor,
511 });
512 }
513 Ok(grants)
514 })())
515 })
516 .await
517 .map_err(process_sqlite_error)?
518 }
519
520 async fn delete_session_process_state(
521 &self,
522 session_id: &str,
523 ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
524 let session_id_owned = session_id.to_string();
525 let (
526 revoked_handle_count,
527 deleted_wake_count,
528 mut orphaned_process_ids,
529 mut preserved_process_ids,
530 ) = self
531 .conn
532 .write_flow(move |tx| {
533 Ok(tx_outcome((|| {
534 let session_id = session_id_owned;
535 let removed = {
536 let mut stmt = tx
537 .prepare(
538 "SELECT g.process_id, p.record_json
539 FROM process_handle_grants g
540 JOIN processes p ON p.process_id = g.process_id
541 WHERE g.session_id = ?1
542 ORDER BY g.process_id ASC",
543 )
544 .map_err(process_sqlite_error)?;
545 let rows = stmt
546 .query_map(params![session_id], |row| {
547 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
548 })
549 .map_err(process_sqlite_error)?;
550 let mut removed = Vec::new();
551 for row in rows {
552 let (process_id, record_json) = row.map_err(process_sqlite_error)?;
553 let record: ProcessRecord =
554 serde_json::from_str(&record_json).map_err(process_decode_error)?;
555 removed.push((process_id, record));
556 }
557 removed
558 };
559
560 let deleted_wake_count = 0;
565 let revoked_handle_count = tx
566 .execute(
567 "DELETE FROM process_handle_grants WHERE session_id = ?1",
568 params![session_id],
569 )
570 .map_err(process_sqlite_error)?;
571 let mut orphaned_process_ids = Vec::new();
572 let mut preserved_process_ids = Vec::new();
573 for (process_id, record) in removed {
574 if record.is_terminal() {
575 continue;
576 }
577 let remaining_grants: i64 = tx
578 .query_row(
579 "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
580 params![process_id],
581 |row| row.get(0),
582 )
583 .map_err(process_sqlite_error)?;
584 if remaining_grants == 0 {
585 orphaned_process_ids.push(process_id);
586 } else {
587 preserved_process_ids.push(process_id);
588 }
589 }
590 let wake_targeted = {
591 let mut stmt = tx
592 .prepare("SELECT process_id, record_json FROM processes ORDER BY process_id ASC")
593 .map_err(process_sqlite_error)?;
594 let rows = stmt
595 .query_map([], |row| {
596 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
597 })
598 .map_err(process_sqlite_error)?;
599 let mut records = Vec::new();
600 for row in rows {
601 let (process_id, record_json) = row.map_err(process_sqlite_error)?;
602 let record: ProcessRecord =
603 serde_json::from_str(&record_json).map_err(process_decode_error)?;
604 records.push((process_id, record));
605 }
606 records
607 };
608 for (_process_id, mut record) in wake_targeted {
609 if record.clear_wake_target_for_session(&session_id) {
610 Self::save_process_conn(tx, &record)?;
611 }
612 }
613 Ok((
614 revoked_handle_count,
615 deleted_wake_count,
616 orphaned_process_ids,
617 preserved_process_ids,
618 ))
619 })()))
620 })
621 .await
622 .map_err(process_sqlite_error)??;
623 orphaned_process_ids.sort();
624 orphaned_process_ids.dedup();
625 preserved_process_ids.sort();
626 preserved_process_ids.dedup();
627 Ok(lash_core::ProcessSessionDeleteReport {
628 session_id: session_id.to_string(),
629 revoked_handle_count,
630 deleted_wake_count,
631 orphaned_process_ids,
632 preserved_process_ids,
633 })
634 }
635
636 async fn append_event(
637 &self,
638 process_id: &str,
639 request: ProcessEventAppendRequest,
640 ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
641 let process_id = process_id.to_string();
642 let (result, appended) = self
643 .conn
644 .write_flow(move |tx| {
645 Ok(tx_outcome((|| {
646 let mut record =
647 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
648 lash_core::PluginError::Session(format!(
649 "unknown process `{process_id}`"
650 ))
651 })?;
652 let replay_lookup = if let Some(replay_key) =
653 request.replay.as_ref().map(|replay| replay.key.as_str())
654 {
655 Self::load_event_by_key_conn(tx, &process_id, replay_key)?
656 } else {
657 None
658 };
659 let sequence = tx
660 .query_row(
661 "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
662 params![process_id],
663 |row| row.get::<_, i64>(0),
664 )
665 .map_err(process_sqlite_error)? as u64;
666 let occurred_at_ms = current_epoch_ms();
667 let prepared = prepare_process_event_append(
668 &record,
669 request,
670 sequence,
671 replay_lookup,
672 occurred_at_ms,
673 )?;
674 match prepared {
675 lash_core::ProcessEventAppendPlan::Replay {
676 event,
677 repair_status,
678 wake_delivery,
679 occurred_at_ms,
680 } => {
681 let repaired = if let Some(status) = repair_status {
682 lash_core::apply_process_status_projection(
683 &mut record,
684 status,
685 occurred_at_ms,
686 );
687 Self::save_process_conn(tx, &record)?;
688 true
689 } else {
690 false
691 };
692 Ok((
693 ProcessEventAppendResult {
694 event,
695 wake_delivery,
696 },
697 repaired,
698 ))
699 }
700 lash_core::ProcessEventAppendPlan::Insert {
701 event,
702 payload_hash,
703 status_update,
704 wake_delivery,
705 occurred_at_ms,
706 } => {
707 tx.execute(
708 "INSERT INTO process_events (
709 process_id, sequence, event_type, payload_hash, idempotency_key,
710 occurred_at_ms, event_json
711 )
712 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
713 params![
714 process_id,
715 sequence as i64,
716 event.event_type.as_str(),
717 payload_hash.as_str(),
718 event.invocation.replay_key(),
719 occurred_at_ms as i64,
720 process_encode_json(&event)?,
721 ],
722 )
723 .map_err(process_sqlite_error)?;
724 if let Some(status) = status_update {
725 lash_core::apply_process_status_projection(
726 &mut record,
727 status,
728 occurred_at_ms,
729 );
730 } else {
731 record.updated_at_ms = occurred_at_ms;
732 }
733 Self::save_process_conn(tx, &record)?;
734 Ok((
735 ProcessEventAppendResult {
736 event,
737 wake_delivery,
738 },
739 true,
740 ))
741 }
742 }
743 })()))
744 })
745 .await
746 .map_err(process_sqlite_error)??;
747 if appended {
748 self.notify.notify_waiters();
749 }
750 Ok(result)
751 }
752
753 async fn events_after(
754 &self,
755 process_id: &str,
756 after_sequence: u64,
757 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
758 let process_id = process_id.to_string();
759 self.conn
760 .call(move |conn| {
761 Ok((|| {
762 if Self::load_process_conn(conn, &process_id)?.is_none() {
763 return Err(lash_core::PluginError::Session(format!(
764 "unknown process `{process_id}`"
765 )));
766 }
767 let mut stmt = conn
768 .prepare(
769 "SELECT event_json FROM process_events
770 WHERE process_id = ?1 AND sequence > ?2
771 ORDER BY sequence ASC",
772 )
773 .map_err(process_sqlite_error)?;
774 let rows = stmt
775 .query_map(params![process_id, after_sequence as i64], |row| {
776 row.get::<_, String>(0)
777 })
778 .map_err(process_sqlite_error)?;
779 let mut events = Vec::new();
780 for row in rows {
781 events.push(
782 serde_json::from_str(&row.map_err(process_sqlite_error)?)
783 .map_err(process_decode_error)?,
784 );
785 }
786 Ok(events)
787 })())
788 })
789 .await
790 .map_err(process_sqlite_error)?
791 }
792
793 async fn count_events_through(
794 &self,
795 process_id: &str,
796 event_type: &str,
797 up_to_sequence: u64,
798 ) -> Result<u64, lash_core::PluginError> {
799 let process_id = process_id.to_string();
800 let event_type = event_type.to_string();
801 self.conn
802 .call(move |conn| {
803 Ok((|| {
804 if Self::load_process_conn(conn, &process_id)?.is_none() {
805 return Err(lash_core::PluginError::Session(format!(
806 "unknown process `{process_id}`"
807 )));
808 }
809 conn.query_row(
810 "SELECT COUNT(*) FROM process_events
811 WHERE process_id = ?1 AND event_type = ?2 AND sequence <= ?3",
812 params![process_id, event_type, up_to_sequence as i64],
813 |row| row.get::<_, i64>(0),
814 )
815 .map(|count| count as u64)
816 .map_err(process_sqlite_error)
817 })())
818 })
819 .await
820 .map_err(process_sqlite_error)?
821 }
822
823 async fn recent_events(
824 &self,
825 process_id: &str,
826 limit: usize,
827 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
828 let process_id = process_id.to_string();
829 self.conn
830 .call(move |conn| {
831 Ok((|| {
832 if Self::load_process_conn(conn, &process_id)?.is_none() {
833 return Err(lash_core::PluginError::Session(format!(
834 "unknown process `{process_id}`"
835 )));
836 }
837 let mut stmt = conn
838 .prepare(
839 "SELECT event_json FROM process_events
840 WHERE process_id = ?1
841 ORDER BY sequence DESC
842 LIMIT ?2",
843 )
844 .map_err(process_sqlite_error)?;
845 let rows = stmt
846 .query_map(params![process_id, limit as i64], |row| {
847 row.get::<_, String>(0)
848 })
849 .map_err(process_sqlite_error)?;
850 let mut events: Vec<ProcessEvent> = Vec::new();
851 for row in rows {
852 events.push(
853 serde_json::from_str(&row.map_err(process_sqlite_error)?)
854 .map_err(process_decode_error)?,
855 );
856 }
857 events.reverse();
858 Ok(events)
859 })())
860 })
861 .await
862 .map_err(process_sqlite_error)?
863 }
864
865 async fn wake_events_after(
866 &self,
867 process_id: &str,
868 after_sequence: u64,
869 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
870 let acked: std::collections::HashSet<u64> = {
871 let process_id = process_id.to_string();
872 self.conn
873 .call(move |conn| {
874 Ok(
875 (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
876 let mut stmt = conn
877 .prepare(
878 "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
879 )
880 .map_err(process_sqlite_error)?;
881 let rows = stmt
882 .query_map(params![process_id], |row| row.get::<_, i64>(0))
883 .map_err(process_sqlite_error)?;
884 let mut set = std::collections::HashSet::new();
885 for row in rows {
886 set.insert(row.map_err(process_sqlite_error)? as u64);
887 }
888 Ok(set)
889 })(),
890 )
891 })
892 .await
893 .map_err(process_sqlite_error)??
894 };
895 Ok(self
896 .events_after(process_id, after_sequence)
897 .await?
898 .into_iter()
899 .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
900 .collect())
901 }
902
903 async fn wait_event_after(
904 &self,
905 process_id: &str,
906 event_type: &str,
907 after_sequence: u64,
908 ) -> Result<ProcessEvent, lash_core::PluginError> {
909 loop {
910 if let Some(event) = self
911 .events_after(process_id, after_sequence)
912 .await?
913 .into_iter()
914 .find(|event| event.event_type == event_type)
915 {
916 return Ok(event);
917 }
918 tokio::select! {
919 _ = self.notify.notified() => {}
920 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
921 }
922 }
923 }
924
925 async fn await_process(
926 &self,
927 process_id: &str,
928 ) -> Result<ProcessAwaitOutput, lash_core::PluginError> {
929 loop {
930 let record = self.get_process(process_id).await.ok_or_else(|| {
931 lash_core::PluginError::Session(format!("unknown process `{process_id}`"))
932 })?;
933 if let Some(await_output) = record.status.await_output() {
934 return Ok(await_output.clone());
935 }
936 tokio::select! {
937 _ = self.notify.notified() => {}
938 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
939 }
940 }
941 }
942
943 async fn complete_process(
944 &self,
945 process_id: &str,
946 await_output: ProcessAwaitOutput,
947 ) -> Result<ProcessRecord, lash_core::PluginError> {
948 let event_type = match await_output.terminal_state() {
949 lash_core::ProcessTerminalState::Completed => "process.completed",
950 lash_core::ProcessTerminalState::Failed => "process.failed",
951 lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
952 };
953 self.append_event(
954 process_id,
955 ProcessEventAppendRequest::new(
956 event_type,
957 serde_json::json!({ "await_output": await_output }),
958 )
959 .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
960 )
961 .await?;
962 self.get_process(process_id).await.ok_or_else(|| {
963 lash_core::PluginError::Session(format!(
964 "unknown process `{process_id}` after terminal event"
965 ))
966 })
967 }
968
969 async fn set_process_wait(
970 &self,
971 process_id: &str,
972 wait: lash_core::WaitState,
973 ) -> Result<ProcessRecord, lash_core::PluginError> {
974 let process_id = process_id.to_string();
975 self.conn
976 .write_flow(move |tx| {
977 Ok(tx_outcome((|| {
978 let mut record =
979 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
980 lash_core::PluginError::Session(format!(
981 "unknown process `{process_id}`"
982 ))
983 })?;
984 if record.is_terminal() {
985 return Err(lash_core::PluginError::Session(format!(
986 "terminal process `{process_id}` cannot enter a wait state"
987 )));
988 }
989 record.wait = Some(wait);
990 record.updated_at_ms = current_epoch_ms();
991 Self::save_process_conn(tx, &record)?;
992 Ok(record)
993 })()))
994 })
995 .await
996 .map_err(process_sqlite_error)?
997 }
998
999 async fn clear_process_wait(
1000 &self,
1001 process_id: &str,
1002 ) -> Result<ProcessRecord, lash_core::PluginError> {
1003 let process_id = process_id.to_string();
1004 self.conn
1005 .write_flow(move |tx| {
1006 Ok(tx_outcome((|| {
1007 let mut record =
1008 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
1009 lash_core::PluginError::Session(format!(
1010 "unknown process `{process_id}`"
1011 ))
1012 })?;
1013 record.wait = None;
1014 record.updated_at_ms = current_epoch_ms();
1015 Self::save_process_conn(tx, &record)?;
1016 Ok(record)
1017 })()))
1018 })
1019 .await
1020 .map_err(process_sqlite_error)?
1021 }
1022
1023 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
1024 let process_id = process_id.to_string();
1025 self.conn
1026 .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
1027 .await
1028 .ok()
1029 .flatten()
1030 }
1031
1032 async fn list_processes(
1033 &self,
1034 filter: &lash_core::ProcessListFilter,
1035 ) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1036 let filter = filter.clone();
1037 self.conn
1038 .call(move |conn| {
1039 Ok((|| {
1040 let mut stmt = conn
1041 .prepare(
1042 "SELECT record_json FROM processes
1043 ORDER BY process_id ASC",
1044 )
1045 .map_err(process_sqlite_error)?;
1046 let rows = stmt
1047 .query_map([], |row| row.get::<_, String>(0))
1048 .map_err(process_sqlite_error)?;
1049 let mut records = Vec::new();
1050 for row in rows {
1051 let record: ProcessRecord =
1052 serde_json::from_str(&row.map_err(process_sqlite_error)?)
1053 .map_err(process_decode_error)?;
1054 if filter.matches_record(&record) {
1055 records.push(record);
1056 }
1057 }
1058 Ok(records)
1059 })())
1060 })
1061 .await
1062 .map_err(process_sqlite_error)?
1063 }
1064
1065 async fn ack_wake(
1066 &self,
1067 process_id: &str,
1068 sequence: u64,
1069 ) -> Result<(), lash_core::PluginError> {
1070 let process_id = process_id.to_string();
1071 self.conn
1072 .call(move |conn| {
1073 Ok((|| {
1074 if Self::load_process_conn(conn, &process_id)?.is_none() {
1075 return Err(lash_core::PluginError::Session(format!(
1076 "unknown process `{process_id}`"
1077 )));
1078 }
1079 conn.execute(
1080 "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
1081 params![process_id, sequence as i64],
1082 )
1083 .map_err(process_sqlite_error)?;
1084 Ok(())
1085 })())
1086 })
1087 .await
1088 .map_err(process_sqlite_error)?
1089 }
1090
1091 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1092 self.conn
1093 .call(move |conn| {
1094 Ok((|| {
1095 let mut stmt = conn
1096 .prepare(
1097 "SELECT record_json FROM processes
1098 WHERE status = 'running'
1099 ORDER BY process_id ASC",
1100 )
1101 .map_err(process_sqlite_error)?;
1102 let rows = stmt
1103 .query_map([], |row| row.get::<_, String>(0))
1104 .map_err(process_sqlite_error)?;
1105 let mut records = Vec::new();
1106 for row in rows {
1107 let record: ProcessRecord =
1108 serde_json::from_str(&row.map_err(process_sqlite_error)?)
1109 .map_err(process_decode_error)?;
1110 records.push(record);
1111 }
1112 Ok(records)
1113 })())
1114 })
1115 .await
1116 .map_err(process_sqlite_error)?
1117 }
1118
1119 async fn claim_process_lease(
1120 &self,
1121 process_id: &str,
1122 owner_id: &str,
1123 lease_ttl_ms: u64,
1124 ) -> Result<ProcessLease, lash_core::PluginError> {
1125 let process_id = process_id.to_string();
1126 let owner_id = owner_id.to_string();
1127 self.conn
1128 .write_flow(move |tx| {
1129 Ok(tx_outcome((|| {
1130 if Self::load_process_conn(tx, &process_id)?.is_none() {
1131 return Err(lash_core::PluginError::Session(format!(
1132 "unknown process `{process_id}`"
1133 )));
1134 }
1135 let now = current_epoch_ms();
1136 let current = Self::load_process_lease_conn(tx, &process_id)?;
1137 if let Some(current) = current.as_ref()
1138 && current.expires_at_epoch_ms > now
1139 && current.owner_id != owner_id
1140 {
1141 return Err(process_lease_conflict(&process_id, current));
1142 }
1143 let fencing_token: u64 = tx
1148 .query_row(
1149 "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1150 params![process_id],
1151 |row| row.get::<_, i64>(0),
1152 )
1153 .optional()
1154 .map_err(process_sqlite_error)?
1155 .unwrap_or(0) as u64
1156 + 1;
1157 let lease = ProcessLease {
1158 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1159 process_id: process_id.clone(),
1160 owner_id: owner_id.clone(),
1161 lease_token: format!(
1162 "{:x}",
1163 Sha256::digest(
1164 format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes()
1165 )
1166 ),
1167 fencing_token,
1168 claimed_at_epoch_ms: now,
1169 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1170 };
1171 tx.execute(
1172 "INSERT INTO process_leases (
1173 process_id, lease_owner_id, lease_token, lease_fencing_token,
1174 lease_claimed_at_ms, lease_expires_at_ms
1175 )
1176 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1177 ON CONFLICT(process_id) DO UPDATE SET
1178 lease_owner_id = excluded.lease_owner_id,
1179 lease_token = excluded.lease_token,
1180 lease_fencing_token = excluded.lease_fencing_token,
1181 lease_claimed_at_ms = excluded.lease_claimed_at_ms,
1182 lease_expires_at_ms = excluded.lease_expires_at_ms",
1183 params![
1184 lease.process_id.as_str(),
1185 lease.owner_id.as_str(),
1186 lease.lease_token.as_str(),
1187 lease.fencing_token as i64,
1188 lease.claimed_at_epoch_ms as i64,
1189 lease.expires_at_epoch_ms as i64,
1190 ],
1191 )
1192 .map_err(process_sqlite_error)?;
1193 Ok(lease)
1194 })()))
1195 })
1196 .await
1197 .map_err(process_sqlite_error)?
1198 }
1199
1200 async fn renew_process_lease(
1201 &self,
1202 lease: &ProcessLease,
1203 lease_ttl_ms: u64,
1204 ) -> Result<ProcessLease, lash_core::PluginError> {
1205 let lease = lease.clone();
1206 self.conn
1207 .write_flow(move |tx| {
1208 Ok(tx_outcome((|| {
1209 let now = current_epoch_ms();
1210 let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
1211 if !guard_lease(current.as_ref(), &lease.lease_token, now) {
1212 return Err(process_lease_expired(&lease.process_id));
1213 }
1214 let renewed = ProcessLease {
1215 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1216 ..lease.clone()
1217 };
1218 tx.execute(
1219 "UPDATE process_leases
1220 SET lease_expires_at_ms = ?2
1221 WHERE process_id = ?1 AND lease_token = ?3",
1222 params![
1223 renewed.process_id.as_str(),
1224 renewed.expires_at_epoch_ms as i64,
1225 renewed.lease_token.as_str(),
1226 ],
1227 )
1228 .map_err(process_sqlite_error)?;
1229 Ok(renewed)
1230 })()))
1231 })
1232 .await
1233 .map_err(process_sqlite_error)?
1234 }
1235
1236 async fn complete_process_lease(
1237 &self,
1238 completion: &ProcessLeaseCompletion,
1239 ) -> Result<(), lash_core::PluginError> {
1240 let process_id = completion.process_id.clone();
1241 let lease_token = completion.lease_token.clone();
1242 self.conn
1243 .call(move |conn| {
1244 conn.execute(
1245 "UPDATE process_leases
1246 SET lease_owner_id = NULL,
1247 lease_token = NULL,
1248 lease_claimed_at_ms = 0,
1249 lease_expires_at_ms = 0
1250 WHERE process_id = ?1 AND lease_token = ?2",
1251 params![process_id, lease_token],
1252 )
1253 })
1254 .await
1255 .map_err(process_sqlite_error)?;
1256 Ok(())
1257 }
1258}
1259
1260fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> lash_core::PluginError {
1263 lash_core::PluginError::Session(format!(
1264 "process `{process_id}` is already leased by `{}` until {}",
1265 current.owner_id, current.expires_at_epoch_ms
1266 ))
1267}
1268
1269fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1271 lash_core::PluginError::Session(format!(
1272 "process lease for `{process_id}` is missing or expired"
1273 ))
1274}
1275
1276fn process_external_ref_conflict(
1277 process_id: &str,
1278 existing: &ProcessExternalRef,
1279 new: &ProcessExternalRef,
1280) -> lash_core::PluginError {
1281 lash_core::PluginError::Session(format!(
1282 "process `{process_id}` external ref conflict: existing {existing:?}, new {new:?}"
1283 ))
1284}