1use super::*;
26
27fn process_status_label(record: &ProcessRecord) -> &'static str {
28 record.status.label()
29}
30
31impl SqliteProcessRegistry {
32 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
33 let conn = SqliteConnection::open(path).await?;
34 ensure_process_schema(&conn).await?;
35 apply_pragmas(&conn, StoreBacking::File).await?;
36 Ok(Self {
37 conn,
38 notify: tokio::sync::Notify::new(),
39 })
40 }
41
42 pub async fn memory() -> tokio_rusqlite::Result<Self> {
43 let conn = SqliteConnection::open_in_memory().await?;
44 ensure_process_schema(&conn).await?;
45 apply_pragmas(&conn, StoreBacking::Memory).await?;
46 Ok(Self {
47 conn,
48 notify: tokio::sync::Notify::new(),
49 })
50 }
51
52 fn load_process_conn(
53 conn: &Connection,
54 process_id: &str,
55 ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
56 let json: Option<String> = conn
57 .query_row(
58 "SELECT record_json FROM processes WHERE process_id = ?1",
59 params![process_id],
60 |row| row.get(0),
61 )
62 .optional()
63 .map_err(process_sqlite_error)?;
64 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
65 .transpose()
66 }
67
68 fn save_process_conn(
69 conn: &Connection,
70 record: &ProcessRecord,
71 ) -> Result<(), lash_core::PluginError> {
72 conn.execute(
73 "UPDATE processes
74 SET updated_at_ms = ?2, status = ?3, record_json = ?4
75 WHERE process_id = ?1",
76 params![
77 record.id.as_str(),
78 record.updated_at_ms as i64,
79 process_status_label(record),
80 process_encode_json(record)?
81 ],
82 )
83 .map_err(process_sqlite_error)?;
84 Ok(())
85 }
86
87 fn load_event_by_key_conn(
88 conn: &Connection,
89 process_id: &str,
90 replay_key: &str,
91 ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
92 let row: Option<(String, String)> = conn
93 .query_row(
94 "SELECT payload_hash, event_json
95 FROM process_events
96 WHERE process_id = ?1 AND idempotency_key = ?2",
97 params![process_id, replay_key],
98 |row| Ok((row.get(0)?, row.get(1)?)),
99 )
100 .optional()
101 .map_err(process_sqlite_error)?;
102 row.map(|(hash, json)| {
103 serde_json::from_str(&json)
104 .map(|event| (hash, event))
105 .map_err(process_decode_error)
106 })
107 .transpose()
108 }
109
110 fn load_process_lease_conn(
111 conn: &Connection,
112 process_id: &str,
113 ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
114 conn.query_row(
115 "SELECT lease_owner_id, lease_token, lease_fencing_token,
116 lease_claimed_at_ms, lease_expires_at_ms
117 FROM process_leases
118 WHERE process_id = ?1",
119 params![process_id],
120 |row| {
121 let owner_id: Option<String> = row.get(0)?;
122 let lease_token: Option<String> = row.get(1)?;
123 let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
124 return Ok(None);
125 };
126 Ok(Some(ProcessLease {
127 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
128 process_id: process_id.to_string(),
129 owner_id,
130 lease_token,
131 fencing_token: row.get::<_, i64>(2)? as u64,
132 claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
133 expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
134 }))
135 },
136 )
137 .optional()
138 .map(|lease| lease.flatten())
139 .map_err(process_sqlite_error)
140 }
141
142 fn list_grants_for_scope_conn(
143 conn: &Connection,
144 session_scope: &SessionScope,
145 live_only: bool,
146 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
147 let session_scope_id = session_scope.id();
148 let status_clause = if live_only {
149 "AND p.status = 'running'"
150 } else {
151 ""
152 };
153 let mut stmt = conn
154 .prepare(&format!(
155 "SELECT g.process_id, g.descriptor_json, p.record_json
156 FROM process_handle_grants g
157 JOIN processes p ON p.process_id = g.process_id
158 WHERE g.scope_id = ?1 {status_clause}
159 ORDER BY g.process_id ASC"
160 ))
161 .map_err(process_sqlite_error)?;
162 let rows = stmt
163 .query_map(params![session_scope_id.as_str()], |row| {
164 Ok((
165 row.get::<_, String>(0)?,
166 row.get::<_, String>(1)?,
167 row.get::<_, String>(2)?,
168 ))
169 })
170 .map_err(process_sqlite_error)?;
171 let mut entries = Vec::new();
172 for row in rows {
173 let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
174 let descriptor: ProcessHandleDescriptor =
175 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
176 let record: ProcessRecord =
177 serde_json::from_str(&record_json).map_err(process_decode_error)?;
178 entries.push((
179 ProcessHandleGrant {
180 session_id: session_scope.session_id.clone(),
181 process_id,
182 descriptor,
183 },
184 record,
185 ));
186 }
187 Ok(entries)
188 }
189}
190
191fn tx_outcome<T>(
196 result: Result<T, lash_core::PluginError>,
197) -> TxOutcome<Result<T, lash_core::PluginError>> {
198 match result {
199 Ok(value) => TxOutcome::Commit(Ok(value)),
200 Err(err) => TxOutcome::Rollback(Err(err)),
201 }
202}
203
204#[async_trait::async_trait]
205impl ProcessRegistry for SqliteProcessRegistry {
206 fn durability_tier(&self) -> DurabilityTier {
207 DurabilityTier::Durable
208 }
209
210 async fn register_process(
211 &self,
212 registration: ProcessRegistration,
213 ) -> Result<ProcessRecord, lash_core::PluginError> {
214 let (registration, registration_hash) = prepare_process_registration(registration)?;
215 let record = self
216 .conn
217 .write_flow(move |tx| {
218 Ok(tx_outcome((|| {
219 if let Some(existing) = Self::load_process_conn(tx, ®istration.id)? {
220 if existing.registration_hash == registration_hash {
221 return Ok(existing);
222 }
223 return Err(lash_core::PluginError::Session(format!(
224 "process `{}` registration hash conflict: existing {}, new {}",
225 registration.id, existing.registration_hash, registration_hash
226 )));
227 }
228 let now = current_epoch_ms();
229 let record = ProcessRecord::from_prepared_registration(
230 registration,
231 registration_hash,
232 now,
233 );
234 let originator_scope_id = record.originator_scope_id();
235 tx.execute(
236 "INSERT INTO processes (
237 process_id, registration_hash, owner_scope_id,
238 created_at_ms, updated_at_ms, status, record_json
239 )
240 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
241 params![
242 record.id.as_str(),
243 record.registration_hash.as_str(),
244 originator_scope_id.as_str(),
245 record.created_at_ms as i64,
246 record.updated_at_ms as i64,
247 process_status_label(&record),
248 process_encode_json(&record)?,
249 ],
250 )
251 .map_err(process_sqlite_error)?;
252 Ok(record)
253 })()))
254 })
255 .await
256 .map_err(process_sqlite_error)??;
257 self.notify.notify_waiters();
258 Ok(record)
259 }
260
261 async fn set_external_ref(
262 &self,
263 process_id: &str,
264 external_ref: ProcessExternalRef,
265 ) -> Result<ProcessRecord, lash_core::PluginError> {
266 let process_id = process_id.to_string();
267 let record = self
268 .conn
269 .write_flow(move |tx| {
270 Ok(tx_outcome((|| {
271 let mut record =
272 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
273 lash_core::PluginError::Session(format!(
274 "unknown process `{process_id}`"
275 ))
276 })?;
277 record.external_ref = Some(external_ref);
278 record.updated_at_ms = current_epoch_ms();
279 Self::save_process_conn(tx, &record)?;
280 Ok(record)
281 })()))
282 })
283 .await
284 .map_err(process_sqlite_error)??;
285 self.notify.notify_waiters();
286 Ok(record)
287 }
288
289 async fn grant_handle(
290 &self,
291 session_scope: &SessionScope,
292 process_id: &str,
293 descriptor: ProcessHandleDescriptor,
294 ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
295 let session_scope = session_scope.clone();
296 let process_id = process_id.to_string();
297 self.conn
298 .write_flow(move |tx| {
299 Ok(tx_outcome((|| {
300 let session_scope_id = session_scope.id();
301 if Self::load_process_conn(tx, &process_id)?.is_none() {
302 return Err(lash_core::PluginError::Session(format!(
303 "unknown process `{process_id}`"
304 )));
305 }
306 tx.execute(
307 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
308 VALUES (?1, ?2, ?3, ?4)
309 ON CONFLICT(scope_id, process_id) DO UPDATE SET
310 session_id = excluded.session_id,
311 descriptor_json = excluded.descriptor_json",
312 params![
313 session_scope.session_id.as_str(),
314 session_scope_id.as_str(),
315 process_id.as_str(),
316 process_encode_json(&descriptor)?
317 ],
318 )
319 .map_err(process_sqlite_error)?;
320 Ok(ProcessHandleGrant {
321 session_id: session_scope.session_id.clone(),
322 process_id: process_id.clone(),
323 descriptor,
324 })
325 })()))
326 })
327 .await
328 .map_err(process_sqlite_error)?
329 }
330
331 async fn revoke_handle(
332 &self,
333 session_scope: &SessionScope,
334 process_id: &str,
335 ) -> Result<(), lash_core::PluginError> {
336 let session_scope_id = session_scope.id().as_str().to_string();
337 let process_id = process_id.to_string();
338 self.conn
339 .call(move |conn| {
340 conn.execute(
341 "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
342 params![session_scope_id, process_id],
343 )
344 })
345 .await
346 .map_err(process_sqlite_error)?;
347 Ok(())
348 }
349
350 async fn transfer_handle_grants(
351 &self,
352 from_scope: &SessionScope,
353 to_scope: &SessionScope,
354 process_ids: &[String],
355 ) -> Result<(), lash_core::PluginError> {
356 let from_scope = from_scope.clone();
357 let to_scope = to_scope.clone();
358 let process_ids = process_ids.to_vec();
359 self.conn
360 .write_flow(move |tx| {
361 Ok(tx_outcome((|| {
362 let from_scope_id = from_scope.id();
363 let to_scope_id = to_scope.id();
364 for process_id in &process_ids {
365 let descriptor_json: Option<String> = tx
366 .query_row(
367 "SELECT descriptor_json
368 FROM process_handle_grants
369 WHERE scope_id = ?1 AND process_id = ?2",
370 params![from_scope_id.as_str(), process_id.as_str()],
371 |row| row.get(0),
372 )
373 .optional()
374 .map_err(process_sqlite_error)?;
375 let Some(descriptor_json) = descriptor_json else {
376 return Err(lash_core::PluginError::Session(format!(
377 "process handle `{process_id}` is not granted to session `{}`",
378 from_scope.session_id
379 )));
380 };
381 tx.execute(
382 "DELETE FROM process_handle_grants
383 WHERE scope_id = ?1 AND process_id = ?2",
384 params![from_scope_id.as_str(), process_id.as_str()],
385 )
386 .map_err(process_sqlite_error)?;
387 tx.execute(
388 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
389 VALUES (?1, ?2, ?3, ?4)
390 ON CONFLICT(scope_id, process_id) DO UPDATE SET
391 session_id = excluded.session_id,
392 descriptor_json = excluded.descriptor_json",
393 params![
394 to_scope.session_id.as_str(),
395 to_scope_id.as_str(),
396 process_id.as_str(),
397 descriptor_json
398 ],
399 )
400 .map_err(process_sqlite_error)?;
401 }
402 Ok(())
403 })()))
404 })
405 .await
406 .map_err(process_sqlite_error)?
407 }
408
409 async fn list_handle_grants(
410 &self,
411 session_scope: &SessionScope,
412 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
413 let session_scope = session_scope.clone();
414 self.conn
415 .call(move |conn| {
416 Ok(Self::list_grants_for_scope_conn(
417 conn,
418 &session_scope,
419 false,
420 ))
421 })
422 .await
423 .map_err(process_sqlite_error)?
424 }
425
426 async fn list_live_handle_grants(
427 &self,
428 session_scope: &SessionScope,
429 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
430 let session_scope = session_scope.clone();
431 self.conn
432 .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &session_scope, true)))
433 .await
434 .map_err(process_sqlite_error)?
435 }
436
437 async fn has_handle_grant(
438 &self,
439 session_scope: &SessionScope,
440 process_id: &str,
441 ) -> Result<bool, lash_core::PluginError> {
442 let session_scope_id = session_scope.id().as_str().to_string();
443 let process_id = process_id.to_string();
444 self.conn
445 .call(move |conn| {
446 let exists = conn
447 .query_row(
448 "SELECT 1
449 FROM process_handle_grants g
450 JOIN processes p ON p.process_id = g.process_id
451 WHERE g.scope_id = ?1 AND g.process_id = ?2
452 LIMIT 1",
453 params![session_scope_id, process_id],
454 |_| Ok(()),
455 )
456 .optional()?
457 .is_some();
458 Ok(exists)
459 })
460 .await
461 .map_err(process_sqlite_error)
462 }
463
464 async fn handle_grants_for_process(
465 &self,
466 process_id: &str,
467 ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
468 let process_id = process_id.to_string();
469 self.conn
470 .call(move |conn| {
471 Ok((|| {
472 if Self::load_process_conn(conn, &process_id)?.is_none() {
473 return Err(lash_core::PluginError::Session(format!(
474 "unknown process `{process_id}`"
475 )));
476 }
477 let mut stmt = conn
478 .prepare(
479 "SELECT session_id, descriptor_json
480 FROM process_handle_grants
481 WHERE process_id = ?1
482 ORDER BY session_id ASC, scope_id ASC",
483 )
484 .map_err(process_sqlite_error)?;
485 let rows = stmt
486 .query_map(params![process_id], |row| {
487 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
488 })
489 .map_err(process_sqlite_error)?;
490 let mut grants = Vec::new();
491 for row in rows {
492 let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
493 let descriptor: ProcessHandleDescriptor =
494 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
495 grants.push(ProcessHandleGrant {
496 session_id,
497 process_id: process_id.clone(),
498 descriptor,
499 });
500 }
501 Ok(grants)
502 })())
503 })
504 .await
505 .map_err(process_sqlite_error)?
506 }
507
508 async fn delete_session_process_state(
509 &self,
510 session_id: &str,
511 ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
512 let session_id_owned = session_id.to_string();
513 let (
514 revoked_handle_count,
515 deleted_wake_count,
516 mut orphaned_process_ids,
517 mut preserved_process_ids,
518 ) = self
519 .conn
520 .write_flow(move |tx| {
521 Ok(tx_outcome((|| {
522 let session_id = session_id_owned;
523 let removed = {
524 let mut stmt = tx
525 .prepare(
526 "SELECT g.process_id, p.record_json
527 FROM process_handle_grants g
528 JOIN processes p ON p.process_id = g.process_id
529 WHERE g.session_id = ?1
530 ORDER BY g.process_id ASC",
531 )
532 .map_err(process_sqlite_error)?;
533 let rows = stmt
534 .query_map(params![session_id], |row| {
535 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
536 })
537 .map_err(process_sqlite_error)?;
538 let mut removed = Vec::new();
539 for row in rows {
540 let (process_id, record_json) = row.map_err(process_sqlite_error)?;
541 let record: ProcessRecord =
542 serde_json::from_str(&record_json).map_err(process_decode_error)?;
543 removed.push((process_id, record));
544 }
545 removed
546 };
547
548 let deleted_wake_count = 0;
553 let revoked_handle_count = tx
554 .execute(
555 "DELETE FROM process_handle_grants WHERE session_id = ?1",
556 params![session_id],
557 )
558 .map_err(process_sqlite_error)?;
559 let mut orphaned_process_ids = Vec::new();
560 let mut preserved_process_ids = Vec::new();
561 for (process_id, record) in removed {
562 if record.is_terminal() {
563 continue;
564 }
565 let remaining_grants: i64 = tx
566 .query_row(
567 "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
568 params![process_id],
569 |row| row.get(0),
570 )
571 .map_err(process_sqlite_error)?;
572 if remaining_grants == 0 {
573 orphaned_process_ids.push(process_id);
574 } else {
575 preserved_process_ids.push(process_id);
576 }
577 }
578 let wake_targeted = {
579 let mut stmt = tx
580 .prepare("SELECT process_id, record_json FROM processes ORDER BY process_id ASC")
581 .map_err(process_sqlite_error)?;
582 let rows = stmt
583 .query_map([], |row| {
584 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
585 })
586 .map_err(process_sqlite_error)?;
587 let mut records = Vec::new();
588 for row in rows {
589 let (process_id, record_json) = row.map_err(process_sqlite_error)?;
590 let record: ProcessRecord =
591 serde_json::from_str(&record_json).map_err(process_decode_error)?;
592 records.push((process_id, record));
593 }
594 records
595 };
596 for (_process_id, mut record) in wake_targeted {
597 if record.clear_wake_target_for_session(&session_id) {
598 Self::save_process_conn(tx, &record)?;
599 }
600 }
601 Ok((
602 revoked_handle_count,
603 deleted_wake_count,
604 orphaned_process_ids,
605 preserved_process_ids,
606 ))
607 })()))
608 })
609 .await
610 .map_err(process_sqlite_error)??;
611 orphaned_process_ids.sort();
612 orphaned_process_ids.dedup();
613 preserved_process_ids.sort();
614 preserved_process_ids.dedup();
615 Ok(lash_core::ProcessSessionDeleteReport {
616 session_id: session_id.to_string(),
617 revoked_handle_count,
618 deleted_wake_count,
619 orphaned_process_ids,
620 preserved_process_ids,
621 })
622 }
623
624 async fn append_event(
625 &self,
626 process_id: &str,
627 request: ProcessEventAppendRequest,
628 ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
629 let process_id = process_id.to_string();
630 let (result, appended) = self
631 .conn
632 .write_flow(move |tx| {
633 Ok(tx_outcome((|| {
634 let mut record =
635 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
636 lash_core::PluginError::Session(format!(
637 "unknown process `{process_id}`"
638 ))
639 })?;
640 let replay_lookup = if let Some(replay_key) =
641 request.replay.as_ref().map(|replay| replay.key.as_str())
642 {
643 Self::load_event_by_key_conn(tx, &process_id, replay_key)?
644 } else {
645 None
646 };
647 let sequence = tx
648 .query_row(
649 "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
650 params![process_id],
651 |row| row.get::<_, i64>(0),
652 )
653 .map_err(process_sqlite_error)? as u64;
654 let occurred_at_ms = current_epoch_ms();
655 let prepared = prepare_process_event_append(
656 &record,
657 request,
658 sequence,
659 replay_lookup,
660 occurred_at_ms,
661 )?;
662 match prepared {
663 lash_core::ProcessEventAppendPlan::Replay {
664 event,
665 repair_status,
666 wake_delivery,
667 occurred_at_ms,
668 } => {
669 let repaired = if let Some(status) = repair_status {
670 lash_core::apply_process_status_projection(
671 &mut record,
672 status,
673 occurred_at_ms,
674 );
675 Self::save_process_conn(tx, &record)?;
676 true
677 } else {
678 false
679 };
680 Ok((
681 ProcessEventAppendResult {
682 event,
683 wake_delivery,
684 },
685 repaired,
686 ))
687 }
688 lash_core::ProcessEventAppendPlan::Insert {
689 event,
690 payload_hash,
691 status_update,
692 wake_delivery,
693 occurred_at_ms,
694 } => {
695 tx.execute(
696 "INSERT INTO process_events (
697 process_id, sequence, event_type, payload_hash, idempotency_key,
698 occurred_at_ms, event_json
699 )
700 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
701 params![
702 process_id,
703 sequence as i64,
704 event.event_type.as_str(),
705 payload_hash.as_str(),
706 event.invocation.replay_key(),
707 occurred_at_ms as i64,
708 process_encode_json(&event)?,
709 ],
710 )
711 .map_err(process_sqlite_error)?;
712 if let Some(status) = status_update {
713 lash_core::apply_process_status_projection(
714 &mut record,
715 status,
716 occurred_at_ms,
717 );
718 } else {
719 record.updated_at_ms = occurred_at_ms;
720 }
721 Self::save_process_conn(tx, &record)?;
722 Ok((
723 ProcessEventAppendResult {
724 event,
725 wake_delivery,
726 },
727 true,
728 ))
729 }
730 }
731 })()))
732 })
733 .await
734 .map_err(process_sqlite_error)??;
735 if appended {
736 self.notify.notify_waiters();
737 }
738 Ok(result)
739 }
740
741 async fn events_after(
742 &self,
743 process_id: &str,
744 after_sequence: u64,
745 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
746 let process_id = process_id.to_string();
747 self.conn
748 .call(move |conn| {
749 Ok((|| {
750 if Self::load_process_conn(conn, &process_id)?.is_none() {
751 return Err(lash_core::PluginError::Session(format!(
752 "unknown process `{process_id}`"
753 )));
754 }
755 let mut stmt = conn
756 .prepare(
757 "SELECT event_json FROM process_events
758 WHERE process_id = ?1 AND sequence > ?2
759 ORDER BY sequence ASC",
760 )
761 .map_err(process_sqlite_error)?;
762 let rows = stmt
763 .query_map(params![process_id, after_sequence as i64], |row| {
764 row.get::<_, String>(0)
765 })
766 .map_err(process_sqlite_error)?;
767 let mut events = Vec::new();
768 for row in rows {
769 events.push(
770 serde_json::from_str(&row.map_err(process_sqlite_error)?)
771 .map_err(process_decode_error)?,
772 );
773 }
774 Ok(events)
775 })())
776 })
777 .await
778 .map_err(process_sqlite_error)?
779 }
780
781 async fn count_events_through(
782 &self,
783 process_id: &str,
784 event_type: &str,
785 up_to_sequence: u64,
786 ) -> Result<u64, lash_core::PluginError> {
787 let process_id = process_id.to_string();
788 let event_type = event_type.to_string();
789 self.conn
790 .call(move |conn| {
791 Ok((|| {
792 if Self::load_process_conn(conn, &process_id)?.is_none() {
793 return Err(lash_core::PluginError::Session(format!(
794 "unknown process `{process_id}`"
795 )));
796 }
797 conn.query_row(
798 "SELECT COUNT(*) FROM process_events
799 WHERE process_id = ?1 AND event_type = ?2 AND sequence <= ?3",
800 params![process_id, event_type, up_to_sequence as i64],
801 |row| row.get::<_, i64>(0),
802 )
803 .map(|count| count as u64)
804 .map_err(process_sqlite_error)
805 })())
806 })
807 .await
808 .map_err(process_sqlite_error)?
809 }
810
811 async fn recent_events(
812 &self,
813 process_id: &str,
814 limit: usize,
815 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
816 let process_id = process_id.to_string();
817 self.conn
818 .call(move |conn| {
819 Ok((|| {
820 if Self::load_process_conn(conn, &process_id)?.is_none() {
821 return Err(lash_core::PluginError::Session(format!(
822 "unknown process `{process_id}`"
823 )));
824 }
825 let mut stmt = conn
826 .prepare(
827 "SELECT event_json FROM process_events
828 WHERE process_id = ?1
829 ORDER BY sequence DESC
830 LIMIT ?2",
831 )
832 .map_err(process_sqlite_error)?;
833 let rows = stmt
834 .query_map(params![process_id, limit as i64], |row| {
835 row.get::<_, String>(0)
836 })
837 .map_err(process_sqlite_error)?;
838 let mut events: Vec<ProcessEvent> = Vec::new();
839 for row in rows {
840 events.push(
841 serde_json::from_str(&row.map_err(process_sqlite_error)?)
842 .map_err(process_decode_error)?,
843 );
844 }
845 events.reverse();
846 Ok(events)
847 })())
848 })
849 .await
850 .map_err(process_sqlite_error)?
851 }
852
853 async fn wake_events_after(
854 &self,
855 process_id: &str,
856 after_sequence: u64,
857 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
858 let acked: std::collections::HashSet<u64> = {
859 let process_id = process_id.to_string();
860 self.conn
861 .call(move |conn| {
862 Ok(
863 (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
864 let mut stmt = conn
865 .prepare(
866 "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
867 )
868 .map_err(process_sqlite_error)?;
869 let rows = stmt
870 .query_map(params![process_id], |row| row.get::<_, i64>(0))
871 .map_err(process_sqlite_error)?;
872 let mut set = std::collections::HashSet::new();
873 for row in rows {
874 set.insert(row.map_err(process_sqlite_error)? as u64);
875 }
876 Ok(set)
877 })(),
878 )
879 })
880 .await
881 .map_err(process_sqlite_error)??
882 };
883 Ok(self
884 .events_after(process_id, after_sequence)
885 .await?
886 .into_iter()
887 .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
888 .collect())
889 }
890
891 async fn wait_event_after(
892 &self,
893 process_id: &str,
894 event_type: &str,
895 after_sequence: u64,
896 ) -> Result<ProcessEvent, lash_core::PluginError> {
897 loop {
898 if let Some(event) = self
899 .events_after(process_id, after_sequence)
900 .await?
901 .into_iter()
902 .find(|event| event.event_type == event_type)
903 {
904 return Ok(event);
905 }
906 tokio::select! {
907 _ = self.notify.notified() => {}
908 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
909 }
910 }
911 }
912
913 async fn await_process(
914 &self,
915 process_id: &str,
916 ) -> Result<ProcessAwaitOutput, lash_core::PluginError> {
917 loop {
918 let record = self.get_process(process_id).await.ok_or_else(|| {
919 lash_core::PluginError::Session(format!("unknown process `{process_id}`"))
920 })?;
921 if let Some(await_output) = record.status.await_output() {
922 return Ok(await_output.clone());
923 }
924 tokio::select! {
925 _ = self.notify.notified() => {}
926 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
927 }
928 }
929 }
930
931 async fn complete_process(
932 &self,
933 process_id: &str,
934 await_output: ProcessAwaitOutput,
935 ) -> Result<ProcessRecord, lash_core::PluginError> {
936 let event_type = match await_output.terminal_state() {
937 lash_core::ProcessTerminalState::Completed => "process.completed",
938 lash_core::ProcessTerminalState::Failed => "process.failed",
939 lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
940 };
941 self.append_event(
942 process_id,
943 ProcessEventAppendRequest::new(
944 event_type,
945 serde_json::json!({ "await_output": await_output }),
946 )
947 .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
948 )
949 .await?;
950 self.get_process(process_id).await.ok_or_else(|| {
951 lash_core::PluginError::Session(format!(
952 "unknown process `{process_id}` after terminal event"
953 ))
954 })
955 }
956
957 async fn set_process_wait(
958 &self,
959 process_id: &str,
960 wait: lash_core::WaitState,
961 ) -> Result<ProcessRecord, lash_core::PluginError> {
962 let process_id = process_id.to_string();
963 self.conn
964 .write_flow(move |tx| {
965 Ok(tx_outcome((|| {
966 let mut record =
967 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
968 lash_core::PluginError::Session(format!(
969 "unknown process `{process_id}`"
970 ))
971 })?;
972 if record.is_terminal() {
973 return Err(lash_core::PluginError::Session(format!(
974 "terminal process `{process_id}` cannot enter a wait state"
975 )));
976 }
977 record.wait = Some(wait);
978 record.updated_at_ms = current_epoch_ms();
979 Self::save_process_conn(tx, &record)?;
980 Ok(record)
981 })()))
982 })
983 .await
984 .map_err(process_sqlite_error)?
985 }
986
987 async fn clear_process_wait(
988 &self,
989 process_id: &str,
990 ) -> Result<ProcessRecord, lash_core::PluginError> {
991 let process_id = process_id.to_string();
992 self.conn
993 .write_flow(move |tx| {
994 Ok(tx_outcome((|| {
995 let mut record =
996 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
997 lash_core::PluginError::Session(format!(
998 "unknown process `{process_id}`"
999 ))
1000 })?;
1001 record.wait = None;
1002 record.updated_at_ms = current_epoch_ms();
1003 Self::save_process_conn(tx, &record)?;
1004 Ok(record)
1005 })()))
1006 })
1007 .await
1008 .map_err(process_sqlite_error)?
1009 }
1010
1011 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
1012 let process_id = process_id.to_string();
1013 self.conn
1014 .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
1015 .await
1016 .ok()
1017 .flatten()
1018 }
1019
1020 async fn list_processes(
1021 &self,
1022 filter: &lash_core::ProcessListFilter,
1023 ) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1024 let filter = filter.clone();
1025 self.conn
1026 .call(move |conn| {
1027 Ok((|| {
1028 let mut stmt = conn
1029 .prepare(
1030 "SELECT record_json FROM processes
1031 ORDER BY process_id ASC",
1032 )
1033 .map_err(process_sqlite_error)?;
1034 let rows = stmt
1035 .query_map([], |row| row.get::<_, String>(0))
1036 .map_err(process_sqlite_error)?;
1037 let mut records = Vec::new();
1038 for row in rows {
1039 let record: ProcessRecord =
1040 serde_json::from_str(&row.map_err(process_sqlite_error)?)
1041 .map_err(process_decode_error)?;
1042 if filter.matches_record(&record) {
1043 records.push(record);
1044 }
1045 }
1046 Ok(records)
1047 })())
1048 })
1049 .await
1050 .map_err(process_sqlite_error)?
1051 }
1052
1053 async fn ack_wake(
1054 &self,
1055 process_id: &str,
1056 sequence: u64,
1057 ) -> Result<(), lash_core::PluginError> {
1058 let process_id = process_id.to_string();
1059 self.conn
1060 .call(move |conn| {
1061 Ok((|| {
1062 if Self::load_process_conn(conn, &process_id)?.is_none() {
1063 return Err(lash_core::PluginError::Session(format!(
1064 "unknown process `{process_id}`"
1065 )));
1066 }
1067 conn.execute(
1068 "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
1069 params![process_id, sequence as i64],
1070 )
1071 .map_err(process_sqlite_error)?;
1072 Ok(())
1073 })())
1074 })
1075 .await
1076 .map_err(process_sqlite_error)?
1077 }
1078
1079 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1080 self.conn
1081 .call(move |conn| {
1082 Ok((|| {
1083 let mut stmt = conn
1084 .prepare(
1085 "SELECT record_json FROM processes
1086 WHERE status = 'running'
1087 ORDER BY process_id ASC",
1088 )
1089 .map_err(process_sqlite_error)?;
1090 let rows = stmt
1091 .query_map([], |row| row.get::<_, String>(0))
1092 .map_err(process_sqlite_error)?;
1093 let mut records = Vec::new();
1094 for row in rows {
1095 let record: ProcessRecord =
1096 serde_json::from_str(&row.map_err(process_sqlite_error)?)
1097 .map_err(process_decode_error)?;
1098 records.push(record);
1099 }
1100 Ok(records)
1101 })())
1102 })
1103 .await
1104 .map_err(process_sqlite_error)?
1105 }
1106
1107 async fn claim_process_lease(
1108 &self,
1109 process_id: &str,
1110 owner_id: &str,
1111 lease_ttl_ms: u64,
1112 ) -> Result<ProcessLease, lash_core::PluginError> {
1113 let process_id = process_id.to_string();
1114 let owner_id = owner_id.to_string();
1115 self.conn
1116 .write_flow(move |tx| {
1117 Ok(tx_outcome((|| {
1118 if Self::load_process_conn(tx, &process_id)?.is_none() {
1119 return Err(lash_core::PluginError::Session(format!(
1120 "unknown process `{process_id}`"
1121 )));
1122 }
1123 let now = current_epoch_ms();
1124 let current = Self::load_process_lease_conn(tx, &process_id)?;
1125 if let Some(current) = current.as_ref()
1126 && current.expires_at_epoch_ms > now
1127 && current.owner_id != owner_id
1128 {
1129 return Err(process_lease_conflict(&process_id, current));
1130 }
1131 let fencing_token: u64 = tx
1136 .query_row(
1137 "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1138 params![process_id],
1139 |row| row.get::<_, i64>(0),
1140 )
1141 .optional()
1142 .map_err(process_sqlite_error)?
1143 .unwrap_or(0) as u64
1144 + 1;
1145 let lease = ProcessLease {
1146 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1147 process_id: process_id.clone(),
1148 owner_id: owner_id.clone(),
1149 lease_token: format!(
1150 "{:x}",
1151 Sha256::digest(
1152 format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes()
1153 )
1154 ),
1155 fencing_token,
1156 claimed_at_epoch_ms: now,
1157 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1158 };
1159 tx.execute(
1160 "INSERT INTO process_leases (
1161 process_id, lease_owner_id, lease_token, lease_fencing_token,
1162 lease_claimed_at_ms, lease_expires_at_ms
1163 )
1164 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1165 ON CONFLICT(process_id) DO UPDATE SET
1166 lease_owner_id = excluded.lease_owner_id,
1167 lease_token = excluded.lease_token,
1168 lease_fencing_token = excluded.lease_fencing_token,
1169 lease_claimed_at_ms = excluded.lease_claimed_at_ms,
1170 lease_expires_at_ms = excluded.lease_expires_at_ms",
1171 params![
1172 lease.process_id.as_str(),
1173 lease.owner_id.as_str(),
1174 lease.lease_token.as_str(),
1175 lease.fencing_token as i64,
1176 lease.claimed_at_epoch_ms as i64,
1177 lease.expires_at_epoch_ms as i64,
1178 ],
1179 )
1180 .map_err(process_sqlite_error)?;
1181 Ok(lease)
1182 })()))
1183 })
1184 .await
1185 .map_err(process_sqlite_error)?
1186 }
1187
1188 async fn renew_process_lease(
1189 &self,
1190 lease: &ProcessLease,
1191 lease_ttl_ms: u64,
1192 ) -> Result<ProcessLease, lash_core::PluginError> {
1193 let lease = lease.clone();
1194 self.conn
1195 .write_flow(move |tx| {
1196 Ok(tx_outcome((|| {
1197 let now = current_epoch_ms();
1198 let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
1199 if !guard_lease(current.as_ref(), &lease.lease_token, now) {
1200 return Err(process_lease_expired(&lease.process_id));
1201 }
1202 let renewed = ProcessLease {
1203 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1204 ..lease.clone()
1205 };
1206 tx.execute(
1207 "UPDATE process_leases
1208 SET lease_expires_at_ms = ?2
1209 WHERE process_id = ?1 AND lease_token = ?3",
1210 params![
1211 renewed.process_id.as_str(),
1212 renewed.expires_at_epoch_ms as i64,
1213 renewed.lease_token.as_str(),
1214 ],
1215 )
1216 .map_err(process_sqlite_error)?;
1217 Ok(renewed)
1218 })()))
1219 })
1220 .await
1221 .map_err(process_sqlite_error)?
1222 }
1223
1224 async fn complete_process_lease(
1225 &self,
1226 completion: &ProcessLeaseCompletion,
1227 ) -> Result<(), lash_core::PluginError> {
1228 let process_id = completion.process_id.clone();
1229 let lease_token = completion.lease_token.clone();
1230 self.conn
1231 .call(move |conn| {
1232 conn.execute(
1233 "UPDATE process_leases
1234 SET lease_owner_id = NULL,
1235 lease_token = NULL,
1236 lease_claimed_at_ms = 0,
1237 lease_expires_at_ms = 0
1238 WHERE process_id = ?1 AND lease_token = ?2",
1239 params![process_id, lease_token],
1240 )
1241 })
1242 .await
1243 .map_err(process_sqlite_error)?;
1244 Ok(())
1245 }
1246}
1247
1248fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> lash_core::PluginError {
1251 lash_core::PluginError::Session(format!(
1252 "process `{process_id}` is already leased by `{}` until {}",
1253 current.owner_id, current.expires_at_epoch_ms
1254 ))
1255}
1256
1257fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1259 lash_core::PluginError::Session(format!(
1260 "process lease for `{process_id}` is missing or expired"
1261 ))
1262}