1use super::*;
26
27fn process_status_label(record: &ProcessRecord) -> &'static str {
28 record.status.label()
29}
30
31impl SqliteProcessRegistry {
32 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
33 let conn = SqliteConnection::open(path).await?;
34 ensure_process_schema(&conn).await?;
35 apply_pragmas(&conn, StoreBacking::File).await?;
36 Ok(Self {
37 conn,
38 notify: tokio::sync::Notify::new(),
39 })
40 }
41
42 pub async fn memory() -> tokio_rusqlite::Result<Self> {
43 let conn = SqliteConnection::open_in_memory().await?;
44 ensure_process_schema(&conn).await?;
45 apply_pragmas(&conn, StoreBacking::Memory).await?;
46 Ok(Self {
47 conn,
48 notify: tokio::sync::Notify::new(),
49 })
50 }
51
52 fn load_process_conn(
53 conn: &Connection,
54 process_id: &str,
55 ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
56 let json: Option<String> = conn
57 .query_row(
58 "SELECT record_json FROM processes WHERE process_id = ?1",
59 params![process_id],
60 |row| row.get(0),
61 )
62 .optional()
63 .map_err(process_sqlite_error)?;
64 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
65 .transpose()
66 }
67
68 fn save_process_conn(
69 conn: &Connection,
70 record: &ProcessRecord,
71 ) -> Result<(), lash_core::PluginError> {
72 conn.execute(
73 "UPDATE processes
74 SET updated_at_ms = ?2, status = ?3, record_json = ?4
75 WHERE process_id = ?1",
76 params![
77 record.id.as_str(),
78 record.updated_at_ms as i64,
79 process_status_label(record),
80 process_encode_json(record)?
81 ],
82 )
83 .map_err(process_sqlite_error)?;
84 Ok(())
85 }
86
87 fn load_event_by_key_conn(
88 conn: &Connection,
89 process_id: &str,
90 replay_key: &str,
91 ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
92 let row: Option<(String, String)> = conn
93 .query_row(
94 "SELECT payload_hash, event_json
95 FROM process_events
96 WHERE process_id = ?1 AND idempotency_key = ?2",
97 params![process_id, replay_key],
98 |row| Ok((row.get(0)?, row.get(1)?)),
99 )
100 .optional()
101 .map_err(process_sqlite_error)?;
102 row.map(|(hash, json)| {
103 serde_json::from_str(&json)
104 .map(|event| (hash, event))
105 .map_err(process_decode_error)
106 })
107 .transpose()
108 }
109
110 fn load_process_lease_conn(
111 conn: &Connection,
112 process_id: &str,
113 ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
114 conn.query_row(
115 "SELECT lease_owner_id, lease_token, lease_fencing_token,
116 lease_claimed_at_ms, lease_expires_at_ms
117 FROM process_leases
118 WHERE process_id = ?1",
119 params![process_id],
120 |row| {
121 let owner_id: Option<String> = row.get(0)?;
122 let lease_token: Option<String> = row.get(1)?;
123 let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
124 return Ok(None);
125 };
126 Ok(Some(ProcessLease {
127 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
128 process_id: process_id.to_string(),
129 owner_id,
130 lease_token,
131 fencing_token: row.get::<_, i64>(2)? as u64,
132 claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
133 expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
134 }))
135 },
136 )
137 .optional()
138 .map(|lease| lease.flatten())
139 .map_err(process_sqlite_error)
140 }
141
142 fn list_grants_for_scope_conn(
143 conn: &Connection,
144 session_scope: &SessionScope,
145 live_only: bool,
146 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
147 let session_scope_id = session_scope.id();
148 let status_clause = if live_only {
149 "AND p.status = 'running'"
150 } else {
151 ""
152 };
153 let mut stmt = conn
154 .prepare(&format!(
155 "SELECT g.process_id, g.descriptor_json, p.record_json
156 FROM process_handle_grants g
157 JOIN processes p ON p.process_id = g.process_id
158 WHERE g.scope_id = ?1 {status_clause}
159 ORDER BY g.process_id ASC"
160 ))
161 .map_err(process_sqlite_error)?;
162 let rows = stmt
163 .query_map(params![session_scope_id.as_str()], |row| {
164 Ok((
165 row.get::<_, String>(0)?,
166 row.get::<_, String>(1)?,
167 row.get::<_, String>(2)?,
168 ))
169 })
170 .map_err(process_sqlite_error)?;
171 let mut entries = Vec::new();
172 for row in rows {
173 let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
174 let descriptor: ProcessHandleDescriptor =
175 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
176 let record: ProcessRecord =
177 serde_json::from_str(&record_json).map_err(process_decode_error)?;
178 entries.push((
179 ProcessHandleGrant {
180 session_id: session_scope.session_id.clone(),
181 process_id,
182 descriptor,
183 },
184 record,
185 ));
186 }
187 Ok(entries)
188 }
189}
190
191fn tx_outcome<T>(
196 result: Result<T, lash_core::PluginError>,
197) -> TxOutcome<Result<T, lash_core::PluginError>> {
198 match result {
199 Ok(value) => TxOutcome::Commit(Ok(value)),
200 Err(err) => TxOutcome::Rollback(Err(err)),
201 }
202}
203
204#[async_trait::async_trait]
205impl ProcessRegistry for SqliteProcessRegistry {
206 fn durability_tier(&self) -> DurabilityTier {
207 DurabilityTier::Durable
208 }
209
210 async fn register_process(
211 &self,
212 registration: ProcessRegistration,
213 ) -> Result<ProcessRecord, lash_core::PluginError> {
214 let (registration, registration_hash) = prepare_process_registration(registration)?;
215 let record = self
216 .conn
217 .write_flow(move |tx| {
218 Ok(tx_outcome((|| {
219 if let Some(existing) = Self::load_process_conn(tx, ®istration.id)? {
220 if existing.registration_hash == registration_hash {
221 return Ok(existing);
222 }
223 return Err(lash_core::PluginError::Session(format!(
224 "process `{}` registration hash conflict: existing {}, new {}",
225 registration.id, existing.registration_hash, registration_hash
226 )));
227 }
228 let now = current_epoch_ms();
229 let record = ProcessRecord::from_prepared_registration(
230 registration,
231 registration_hash,
232 now,
233 );
234 let originator_scope_id = record.originator_scope_id();
235 tx.execute(
236 "INSERT INTO processes (
237 process_id, registration_hash, owner_scope_id,
238 created_at_ms, updated_at_ms, status, record_json
239 )
240 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
241 params![
242 record.id.as_str(),
243 record.registration_hash.as_str(),
244 originator_scope_id.as_str(),
245 record.created_at_ms as i64,
246 record.updated_at_ms as i64,
247 process_status_label(&record),
248 process_encode_json(&record)?,
249 ],
250 )
251 .map_err(process_sqlite_error)?;
252 Ok(record)
253 })()))
254 })
255 .await
256 .map_err(process_sqlite_error)??;
257 self.notify.notify_waiters();
258 Ok(record)
259 }
260
261 async fn set_external_ref(
262 &self,
263 process_id: &str,
264 external_ref: ProcessExternalRef,
265 ) -> Result<ProcessRecord, lash_core::PluginError> {
266 let process_id = process_id.to_string();
267 let record = self
268 .conn
269 .write_flow(move |tx| {
270 Ok(tx_outcome((|| {
271 let mut record =
272 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
273 lash_core::PluginError::Session(format!(
274 "unknown process `{process_id}`"
275 ))
276 })?;
277 record.external_ref = Some(external_ref);
278 record.updated_at_ms = current_epoch_ms();
279 Self::save_process_conn(tx, &record)?;
280 Ok(record)
281 })()))
282 })
283 .await
284 .map_err(process_sqlite_error)??;
285 self.notify.notify_waiters();
286 Ok(record)
287 }
288
289 async fn grant_handle(
290 &self,
291 session_scope: &SessionScope,
292 process_id: &str,
293 descriptor: ProcessHandleDescriptor,
294 ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
295 let session_scope = session_scope.clone();
296 let process_id = process_id.to_string();
297 self.conn
298 .write_flow(move |tx| {
299 Ok(tx_outcome((|| {
300 let session_scope_id = session_scope.id();
301 if Self::load_process_conn(tx, &process_id)?.is_none() {
302 return Err(lash_core::PluginError::Session(format!(
303 "unknown process `{process_id}`"
304 )));
305 }
306 tx.execute(
307 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
308 VALUES (?1, ?2, ?3, ?4)
309 ON CONFLICT(scope_id, process_id) DO UPDATE SET
310 session_id = excluded.session_id,
311 descriptor_json = excluded.descriptor_json",
312 params![
313 session_scope.session_id.as_str(),
314 session_scope_id.as_str(),
315 process_id.as_str(),
316 process_encode_json(&descriptor)?
317 ],
318 )
319 .map_err(process_sqlite_error)?;
320 Ok(ProcessHandleGrant {
321 session_id: session_scope.session_id.clone(),
322 process_id: process_id.clone(),
323 descriptor,
324 })
325 })()))
326 })
327 .await
328 .map_err(process_sqlite_error)?
329 }
330
331 async fn revoke_handle(
332 &self,
333 session_scope: &SessionScope,
334 process_id: &str,
335 ) -> Result<(), lash_core::PluginError> {
336 let session_scope_id = session_scope.id().as_str().to_string();
337 let process_id = process_id.to_string();
338 self.conn
339 .call(move |conn| {
340 conn.execute(
341 "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
342 params![session_scope_id, process_id],
343 )
344 })
345 .await
346 .map_err(process_sqlite_error)?;
347 Ok(())
348 }
349
350 async fn transfer_handle_grants(
351 &self,
352 from_scope: &SessionScope,
353 to_scope: &SessionScope,
354 process_ids: &[String],
355 ) -> Result<(), lash_core::PluginError> {
356 let from_scope = from_scope.clone();
357 let to_scope = to_scope.clone();
358 let process_ids = process_ids.to_vec();
359 self.conn
360 .write_flow(move |tx| {
361 Ok(tx_outcome((|| {
362 let from_scope_id = from_scope.id();
363 let to_scope_id = to_scope.id();
364 for process_id in &process_ids {
365 let descriptor_json: Option<String> = tx
366 .query_row(
367 "SELECT descriptor_json
368 FROM process_handle_grants
369 WHERE scope_id = ?1 AND process_id = ?2",
370 params![from_scope_id.as_str(), process_id.as_str()],
371 |row| row.get(0),
372 )
373 .optional()
374 .map_err(process_sqlite_error)?;
375 let Some(descriptor_json) = descriptor_json else {
376 return Err(lash_core::PluginError::Session(format!(
377 "process handle `{process_id}` is not granted to session `{}`",
378 from_scope.session_id
379 )));
380 };
381 tx.execute(
382 "DELETE FROM process_handle_grants
383 WHERE scope_id = ?1 AND process_id = ?2",
384 params![from_scope_id.as_str(), process_id.as_str()],
385 )
386 .map_err(process_sqlite_error)?;
387 tx.execute(
388 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
389 VALUES (?1, ?2, ?3, ?4)
390 ON CONFLICT(scope_id, process_id) DO UPDATE SET
391 session_id = excluded.session_id,
392 descriptor_json = excluded.descriptor_json",
393 params![
394 to_scope.session_id.as_str(),
395 to_scope_id.as_str(),
396 process_id.as_str(),
397 descriptor_json
398 ],
399 )
400 .map_err(process_sqlite_error)?;
401 }
402 Ok(())
403 })()))
404 })
405 .await
406 .map_err(process_sqlite_error)?
407 }
408
409 async fn list_handle_grants(
410 &self,
411 session_scope: &SessionScope,
412 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
413 let session_scope = session_scope.clone();
414 self.conn
415 .call(move |conn| {
416 Ok(Self::list_grants_for_scope_conn(
417 conn,
418 &session_scope,
419 false,
420 ))
421 })
422 .await
423 .map_err(process_sqlite_error)?
424 }
425
426 async fn list_live_handle_grants(
427 &self,
428 session_scope: &SessionScope,
429 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
430 let session_scope = session_scope.clone();
431 self.conn
432 .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &session_scope, true)))
433 .await
434 .map_err(process_sqlite_error)?
435 }
436
437 async fn has_handle_grant(
438 &self,
439 session_scope: &SessionScope,
440 process_id: &str,
441 ) -> Result<bool, lash_core::PluginError> {
442 let session_scope_id = session_scope.id().as_str().to_string();
443 let process_id = process_id.to_string();
444 self.conn
445 .call(move |conn| {
446 let exists = conn
447 .query_row(
448 "SELECT 1
449 FROM process_handle_grants g
450 JOIN processes p ON p.process_id = g.process_id
451 WHERE g.scope_id = ?1 AND g.process_id = ?2
452 LIMIT 1",
453 params![session_scope_id, process_id],
454 |_| Ok(()),
455 )
456 .optional()?
457 .is_some();
458 Ok(exists)
459 })
460 .await
461 .map_err(process_sqlite_error)
462 }
463
464 async fn handle_grants_for_process(
465 &self,
466 process_id: &str,
467 ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
468 let process_id = process_id.to_string();
469 self.conn
470 .call(move |conn| {
471 Ok((|| {
472 if Self::load_process_conn(conn, &process_id)?.is_none() {
473 return Err(lash_core::PluginError::Session(format!(
474 "unknown process `{process_id}`"
475 )));
476 }
477 let mut stmt = conn
478 .prepare(
479 "SELECT session_id, descriptor_json
480 FROM process_handle_grants
481 WHERE process_id = ?1
482 ORDER BY session_id ASC, scope_id ASC",
483 )
484 .map_err(process_sqlite_error)?;
485 let rows = stmt
486 .query_map(params![process_id], |row| {
487 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
488 })
489 .map_err(process_sqlite_error)?;
490 let mut grants = Vec::new();
491 for row in rows {
492 let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
493 let descriptor: ProcessHandleDescriptor =
494 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
495 grants.push(ProcessHandleGrant {
496 session_id,
497 process_id: process_id.clone(),
498 descriptor,
499 });
500 }
501 Ok(grants)
502 })())
503 })
504 .await
505 .map_err(process_sqlite_error)?
506 }
507
508 async fn delete_session_process_state(
509 &self,
510 session_id: &str,
511 ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
512 let session_id_owned = session_id.to_string();
513 let (
514 revoked_handle_count,
515 deleted_wake_count,
516 mut orphaned_process_ids,
517 mut preserved_process_ids,
518 ) = self
519 .conn
520 .write_flow(move |tx| {
521 Ok(tx_outcome((|| {
522 let session_id = session_id_owned;
523 let removed = {
524 let mut stmt = tx
525 .prepare(
526 "SELECT g.process_id, p.record_json
527 FROM process_handle_grants g
528 JOIN processes p ON p.process_id = g.process_id
529 WHERE g.session_id = ?1
530 ORDER BY g.process_id ASC",
531 )
532 .map_err(process_sqlite_error)?;
533 let rows = stmt
534 .query_map(params![session_id], |row| {
535 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
536 })
537 .map_err(process_sqlite_error)?;
538 let mut removed = Vec::new();
539 for row in rows {
540 let (process_id, record_json) = row.map_err(process_sqlite_error)?;
541 let record: ProcessRecord =
542 serde_json::from_str(&record_json).map_err(process_decode_error)?;
543 removed.push((process_id, record));
544 }
545 removed
546 };
547
548 let deleted_wake_count = 0;
553 let revoked_handle_count = tx
554 .execute(
555 "DELETE FROM process_handle_grants WHERE session_id = ?1",
556 params![session_id],
557 )
558 .map_err(process_sqlite_error)?;
559 let mut orphaned_process_ids = Vec::new();
560 let mut preserved_process_ids = Vec::new();
561 for (process_id, record) in removed {
562 if record.is_terminal() {
563 continue;
564 }
565 let remaining_grants: i64 = tx
566 .query_row(
567 "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
568 params![process_id],
569 |row| row.get(0),
570 )
571 .map_err(process_sqlite_error)?;
572 if remaining_grants == 0 {
573 orphaned_process_ids.push(process_id);
574 } else {
575 preserved_process_ids.push(process_id);
576 }
577 }
578 Ok((
579 revoked_handle_count,
580 deleted_wake_count,
581 orphaned_process_ids,
582 preserved_process_ids,
583 ))
584 })()))
585 })
586 .await
587 .map_err(process_sqlite_error)??;
588 orphaned_process_ids.sort();
589 orphaned_process_ids.dedup();
590 preserved_process_ids.sort();
591 preserved_process_ids.dedup();
592 Ok(lash_core::ProcessSessionDeleteReport {
593 session_id: session_id.to_string(),
594 revoked_handle_count,
595 deleted_wake_count,
596 orphaned_process_ids,
597 preserved_process_ids,
598 })
599 }
600
601 async fn append_event(
602 &self,
603 process_id: &str,
604 request: ProcessEventAppendRequest,
605 ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
606 let process_id = process_id.to_string();
607 let (result, appended) = self
608 .conn
609 .write_flow(move |tx| {
610 Ok(tx_outcome((|| {
611 let mut record =
612 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
613 lash_core::PluginError::Session(format!(
614 "unknown process `{process_id}`"
615 ))
616 })?;
617 let replay_lookup = if let Some(replay_key) =
618 request.replay.as_ref().map(|replay| replay.key.as_str())
619 {
620 Self::load_event_by_key_conn(tx, &process_id, replay_key)?
621 } else {
622 None
623 };
624 let sequence = tx
625 .query_row(
626 "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
627 params![process_id],
628 |row| row.get::<_, i64>(0),
629 )
630 .map_err(process_sqlite_error)? as u64;
631 let occurred_at_ms = current_epoch_ms();
632 let prepared = prepare_process_event_append(
633 &record,
634 request,
635 sequence,
636 replay_lookup,
637 occurred_at_ms,
638 )?;
639 if prepared.replayed {
640 return Ok((
641 ProcessEventAppendResult {
642 event: prepared.event,
643 wake_delivery: prepared.wake_delivery,
644 },
645 false,
646 ));
647 }
648 let event = prepared.event;
649 tx.execute(
650 "INSERT INTO process_events (
651 process_id, sequence, event_type, payload_hash, idempotency_key,
652 occurred_at_ms, event_json
653 )
654 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
655 params![
656 process_id,
657 sequence as i64,
658 event.event_type.as_str(),
659 prepared.payload_hash.as_str(),
660 event.invocation.replay_key(),
661 prepared.occurred_at_ms as i64,
662 process_encode_json(&event)?,
663 ],
664 )
665 .map_err(process_sqlite_error)?;
666 if let Some(status) = prepared.status_update.clone() {
667 record.status = status;
668 if record.status.is_terminal() {
669 record.wait = None;
670 }
671 }
672 record.updated_at_ms = prepared.occurred_at_ms;
673 Self::save_process_conn(tx, &record)?;
674 Ok((
675 ProcessEventAppendResult {
676 event,
677 wake_delivery: prepared.wake_delivery,
678 },
679 true,
680 ))
681 })()))
682 })
683 .await
684 .map_err(process_sqlite_error)??;
685 if appended {
686 self.notify.notify_waiters();
687 }
688 Ok(result)
689 }
690
691 async fn events_after(
692 &self,
693 process_id: &str,
694 after_sequence: u64,
695 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
696 let process_id = process_id.to_string();
697 self.conn
698 .call(move |conn| {
699 Ok((|| {
700 if Self::load_process_conn(conn, &process_id)?.is_none() {
701 return Err(lash_core::PluginError::Session(format!(
702 "unknown process `{process_id}`"
703 )));
704 }
705 let mut stmt = conn
706 .prepare(
707 "SELECT event_json FROM process_events
708 WHERE process_id = ?1 AND sequence > ?2
709 ORDER BY sequence ASC",
710 )
711 .map_err(process_sqlite_error)?;
712 let rows = stmt
713 .query_map(params![process_id, after_sequence as i64], |row| {
714 row.get::<_, String>(0)
715 })
716 .map_err(process_sqlite_error)?;
717 let mut events = Vec::new();
718 for row in rows {
719 events.push(
720 serde_json::from_str(&row.map_err(process_sqlite_error)?)
721 .map_err(process_decode_error)?,
722 );
723 }
724 Ok(events)
725 })())
726 })
727 .await
728 .map_err(process_sqlite_error)?
729 }
730
731 async fn count_events_through(
732 &self,
733 process_id: &str,
734 event_type: &str,
735 up_to_sequence: u64,
736 ) -> Result<u64, lash_core::PluginError> {
737 let process_id = process_id.to_string();
738 let event_type = event_type.to_string();
739 self.conn
740 .call(move |conn| {
741 Ok((|| {
742 if Self::load_process_conn(conn, &process_id)?.is_none() {
743 return Err(lash_core::PluginError::Session(format!(
744 "unknown process `{process_id}`"
745 )));
746 }
747 conn.query_row(
748 "SELECT COUNT(*) FROM process_events
749 WHERE process_id = ?1 AND event_type = ?2 AND sequence <= ?3",
750 params![process_id, event_type, up_to_sequence as i64],
751 |row| row.get::<_, i64>(0),
752 )
753 .map(|count| count as u64)
754 .map_err(process_sqlite_error)
755 })())
756 })
757 .await
758 .map_err(process_sqlite_error)?
759 }
760
761 async fn recent_events(
762 &self,
763 process_id: &str,
764 limit: usize,
765 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
766 let process_id = process_id.to_string();
767 self.conn
768 .call(move |conn| {
769 Ok((|| {
770 if Self::load_process_conn(conn, &process_id)?.is_none() {
771 return Err(lash_core::PluginError::Session(format!(
772 "unknown process `{process_id}`"
773 )));
774 }
775 let mut stmt = conn
776 .prepare(
777 "SELECT event_json FROM process_events
778 WHERE process_id = ?1
779 ORDER BY sequence DESC
780 LIMIT ?2",
781 )
782 .map_err(process_sqlite_error)?;
783 let rows = stmt
784 .query_map(params![process_id, limit as i64], |row| {
785 row.get::<_, String>(0)
786 })
787 .map_err(process_sqlite_error)?;
788 let mut events: Vec<ProcessEvent> = Vec::new();
789 for row in rows {
790 events.push(
791 serde_json::from_str(&row.map_err(process_sqlite_error)?)
792 .map_err(process_decode_error)?,
793 );
794 }
795 events.reverse();
796 Ok(events)
797 })())
798 })
799 .await
800 .map_err(process_sqlite_error)?
801 }
802
803 async fn wake_events_after(
804 &self,
805 process_id: &str,
806 after_sequence: u64,
807 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
808 let acked: std::collections::HashSet<u64> = {
809 let process_id = process_id.to_string();
810 self.conn
811 .call(move |conn| {
812 Ok(
813 (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
814 let mut stmt = conn
815 .prepare(
816 "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
817 )
818 .map_err(process_sqlite_error)?;
819 let rows = stmt
820 .query_map(params![process_id], |row| row.get::<_, i64>(0))
821 .map_err(process_sqlite_error)?;
822 let mut set = std::collections::HashSet::new();
823 for row in rows {
824 set.insert(row.map_err(process_sqlite_error)? as u64);
825 }
826 Ok(set)
827 })(),
828 )
829 })
830 .await
831 .map_err(process_sqlite_error)??
832 };
833 Ok(self
834 .events_after(process_id, after_sequence)
835 .await?
836 .into_iter()
837 .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
838 .collect())
839 }
840
841 async fn wait_event_after(
842 &self,
843 process_id: &str,
844 event_type: &str,
845 after_sequence: u64,
846 ) -> Result<ProcessEvent, lash_core::PluginError> {
847 loop {
848 if let Some(event) = self
849 .events_after(process_id, after_sequence)
850 .await?
851 .into_iter()
852 .find(|event| event.event_type == event_type)
853 {
854 return Ok(event);
855 }
856 tokio::select! {
857 _ = self.notify.notified() => {}
858 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
859 }
860 }
861 }
862
863 async fn await_process(
864 &self,
865 process_id: &str,
866 ) -> Result<ProcessAwaitOutput, lash_core::PluginError> {
867 loop {
868 let record = self.get_process(process_id).await.ok_or_else(|| {
869 lash_core::PluginError::Session(format!("unknown process `{process_id}`"))
870 })?;
871 if let Some(await_output) = record.status.await_output() {
872 return Ok(await_output.clone());
873 }
874 tokio::select! {
875 _ = self.notify.notified() => {}
876 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
877 }
878 }
879 }
880
881 async fn complete_process(
882 &self,
883 process_id: &str,
884 await_output: ProcessAwaitOutput,
885 ) -> Result<ProcessRecord, lash_core::PluginError> {
886 let event_type = match await_output.terminal_state() {
887 lash_core::ProcessTerminalState::Completed => "process.completed",
888 lash_core::ProcessTerminalState::Failed => "process.failed",
889 lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
890 };
891 self.append_event(
892 process_id,
893 ProcessEventAppendRequest::new(
894 event_type,
895 serde_json::json!({ "await_output": await_output }),
896 )
897 .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
898 )
899 .await?;
900 self.get_process(process_id).await.ok_or_else(|| {
901 lash_core::PluginError::Session(format!(
902 "unknown process `{process_id}` after terminal event"
903 ))
904 })
905 }
906
907 async fn set_process_wait(
908 &self,
909 process_id: &str,
910 wait: lash_core::WaitState,
911 ) -> Result<ProcessRecord, lash_core::PluginError> {
912 let process_id = process_id.to_string();
913 self.conn
914 .write_flow(move |tx| {
915 Ok(tx_outcome((|| {
916 let mut record =
917 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
918 lash_core::PluginError::Session(format!(
919 "unknown process `{process_id}`"
920 ))
921 })?;
922 if record.is_terminal() {
923 return Err(lash_core::PluginError::Session(format!(
924 "terminal process `{process_id}` cannot enter a wait state"
925 )));
926 }
927 record.wait = Some(wait);
928 record.updated_at_ms = current_epoch_ms();
929 Self::save_process_conn(tx, &record)?;
930 Ok(record)
931 })()))
932 })
933 .await
934 .map_err(process_sqlite_error)?
935 }
936
937 async fn clear_process_wait(
938 &self,
939 process_id: &str,
940 ) -> Result<ProcessRecord, lash_core::PluginError> {
941 let process_id = process_id.to_string();
942 self.conn
943 .write_flow(move |tx| {
944 Ok(tx_outcome((|| {
945 let mut record =
946 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
947 lash_core::PluginError::Session(format!(
948 "unknown process `{process_id}`"
949 ))
950 })?;
951 record.wait = None;
952 record.updated_at_ms = current_epoch_ms();
953 Self::save_process_conn(tx, &record)?;
954 Ok(record)
955 })()))
956 })
957 .await
958 .map_err(process_sqlite_error)?
959 }
960
961 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
962 let process_id = process_id.to_string();
963 self.conn
964 .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
965 .await
966 .ok()
967 .flatten()
968 }
969
970 async fn list_processes(
971 &self,
972 filter: &lash_core::ProcessListFilter,
973 ) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
974 let filter = filter.clone();
975 self.conn
976 .call(move |conn| {
977 Ok((|| {
978 let mut stmt = conn
979 .prepare(
980 "SELECT record_json FROM processes
981 ORDER BY process_id ASC",
982 )
983 .map_err(process_sqlite_error)?;
984 let rows = stmt
985 .query_map([], |row| row.get::<_, String>(0))
986 .map_err(process_sqlite_error)?;
987 let mut records = Vec::new();
988 for row in rows {
989 let record: ProcessRecord =
990 serde_json::from_str(&row.map_err(process_sqlite_error)?)
991 .map_err(process_decode_error)?;
992 if filter.matches_record(&record) {
993 records.push(record);
994 }
995 }
996 Ok(records)
997 })())
998 })
999 .await
1000 .map_err(process_sqlite_error)?
1001 }
1002
1003 async fn ack_wake(
1004 &self,
1005 process_id: &str,
1006 sequence: u64,
1007 ) -> Result<(), lash_core::PluginError> {
1008 let process_id = process_id.to_string();
1009 self.conn
1010 .call(move |conn| {
1011 Ok((|| {
1012 if Self::load_process_conn(conn, &process_id)?.is_none() {
1013 return Err(lash_core::PluginError::Session(format!(
1014 "unknown process `{process_id}`"
1015 )));
1016 }
1017 conn.execute(
1018 "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
1019 params![process_id, sequence as i64],
1020 )
1021 .map_err(process_sqlite_error)?;
1022 Ok(())
1023 })())
1024 })
1025 .await
1026 .map_err(process_sqlite_error)?
1027 }
1028
1029 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1030 self.conn
1031 .call(move |conn| {
1032 Ok((|| {
1033 let mut stmt = conn
1034 .prepare(
1035 "SELECT record_json FROM processes
1036 WHERE status = 'running'
1037 ORDER BY process_id ASC",
1038 )
1039 .map_err(process_sqlite_error)?;
1040 let rows = stmt
1041 .query_map([], |row| row.get::<_, String>(0))
1042 .map_err(process_sqlite_error)?;
1043 let mut records = Vec::new();
1044 for row in rows {
1045 let record: ProcessRecord =
1046 serde_json::from_str(&row.map_err(process_sqlite_error)?)
1047 .map_err(process_decode_error)?;
1048 records.push(record);
1049 }
1050 Ok(records)
1051 })())
1052 })
1053 .await
1054 .map_err(process_sqlite_error)?
1055 }
1056
1057 async fn claim_process_lease(
1058 &self,
1059 process_id: &str,
1060 owner_id: &str,
1061 lease_ttl_ms: u64,
1062 ) -> Result<ProcessLease, lash_core::PluginError> {
1063 let process_id = process_id.to_string();
1064 let owner_id = owner_id.to_string();
1065 self.conn
1066 .write_flow(move |tx| {
1067 Ok(tx_outcome((|| {
1068 if Self::load_process_conn(tx, &process_id)?.is_none() {
1069 return Err(lash_core::PluginError::Session(format!(
1070 "unknown process `{process_id}`"
1071 )));
1072 }
1073 let now = current_epoch_ms();
1074 let current = Self::load_process_lease_conn(tx, &process_id)?;
1075 if let Some(current) = current.as_ref()
1076 && current.expires_at_epoch_ms > now
1077 && current.owner_id != owner_id
1078 {
1079 return Err(process_lease_conflict(&process_id, current));
1080 }
1081 let fencing_token: u64 = tx
1086 .query_row(
1087 "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1088 params![process_id],
1089 |row| row.get::<_, i64>(0),
1090 )
1091 .optional()
1092 .map_err(process_sqlite_error)?
1093 .unwrap_or(0) as u64
1094 + 1;
1095 let lease = ProcessLease {
1096 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1097 process_id: process_id.clone(),
1098 owner_id: owner_id.clone(),
1099 lease_token: format!(
1100 "{:x}",
1101 Sha256::digest(
1102 format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes()
1103 )
1104 ),
1105 fencing_token,
1106 claimed_at_epoch_ms: now,
1107 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1108 };
1109 tx.execute(
1110 "INSERT INTO process_leases (
1111 process_id, lease_owner_id, lease_token, lease_fencing_token,
1112 lease_claimed_at_ms, lease_expires_at_ms
1113 )
1114 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1115 ON CONFLICT(process_id) DO UPDATE SET
1116 lease_owner_id = excluded.lease_owner_id,
1117 lease_token = excluded.lease_token,
1118 lease_fencing_token = excluded.lease_fencing_token,
1119 lease_claimed_at_ms = excluded.lease_claimed_at_ms,
1120 lease_expires_at_ms = excluded.lease_expires_at_ms",
1121 params![
1122 lease.process_id.as_str(),
1123 lease.owner_id.as_str(),
1124 lease.lease_token.as_str(),
1125 lease.fencing_token as i64,
1126 lease.claimed_at_epoch_ms as i64,
1127 lease.expires_at_epoch_ms as i64,
1128 ],
1129 )
1130 .map_err(process_sqlite_error)?;
1131 Ok(lease)
1132 })()))
1133 })
1134 .await
1135 .map_err(process_sqlite_error)?
1136 }
1137
1138 async fn renew_process_lease(
1139 &self,
1140 lease: &ProcessLease,
1141 lease_ttl_ms: u64,
1142 ) -> Result<ProcessLease, lash_core::PluginError> {
1143 let lease = lease.clone();
1144 self.conn
1145 .write_flow(move |tx| {
1146 Ok(tx_outcome((|| {
1147 let now = current_epoch_ms();
1148 let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
1149 if !guard_lease(current.as_ref(), &lease.lease_token, now) {
1150 return Err(process_lease_expired(&lease.process_id));
1151 }
1152 let renewed = ProcessLease {
1153 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1154 ..lease.clone()
1155 };
1156 tx.execute(
1157 "UPDATE process_leases
1158 SET lease_expires_at_ms = ?2
1159 WHERE process_id = ?1 AND lease_token = ?3",
1160 params![
1161 renewed.process_id.as_str(),
1162 renewed.expires_at_epoch_ms as i64,
1163 renewed.lease_token.as_str(),
1164 ],
1165 )
1166 .map_err(process_sqlite_error)?;
1167 Ok(renewed)
1168 })()))
1169 })
1170 .await
1171 .map_err(process_sqlite_error)?
1172 }
1173
1174 async fn complete_process_lease(
1175 &self,
1176 completion: &ProcessLeaseCompletion,
1177 ) -> Result<(), lash_core::PluginError> {
1178 let process_id = completion.process_id.clone();
1179 let lease_token = completion.lease_token.clone();
1180 self.conn
1181 .call(move |conn| {
1182 conn.execute(
1183 "UPDATE process_leases
1184 SET lease_owner_id = NULL,
1185 lease_token = NULL,
1186 lease_claimed_at_ms = 0,
1187 lease_expires_at_ms = 0
1188 WHERE process_id = ?1 AND lease_token = ?2",
1189 params![process_id, lease_token],
1190 )
1191 })
1192 .await
1193 .map_err(process_sqlite_error)?;
1194 Ok(())
1195 }
1196}
1197
1198fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> lash_core::PluginError {
1201 lash_core::PluginError::Session(format!(
1202 "process `{process_id}` is already leased by `{}` until {}",
1203 current.owner_id, current.expires_at_epoch_ms
1204 ))
1205}
1206
1207fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1209 lash_core::PluginError::Session(format!(
1210 "process lease for `{process_id}` is missing or expired"
1211 ))
1212}