1use super::*;
26
27fn process_status_label(record: &ProcessRecord) -> &'static str {
28 record.status.label()
29}
30
31impl SqliteProcessRegistry {
32 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
33 let conn = SqliteConnection::open(path).await?;
34 ensure_process_schema(&conn).await?;
35 apply_pragmas(&conn, StoreBacking::File).await?;
36 Ok(Self {
37 conn,
38 notify: tokio::sync::Notify::new(),
39 })
40 }
41
42 pub async fn memory() -> tokio_rusqlite::Result<Self> {
43 let conn = SqliteConnection::open_in_memory().await?;
44 ensure_process_schema(&conn).await?;
45 apply_pragmas(&conn, StoreBacking::Memory).await?;
46 Ok(Self {
47 conn,
48 notify: tokio::sync::Notify::new(),
49 })
50 }
51
52 fn load_process_conn(
53 conn: &Connection,
54 process_id: &str,
55 ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
56 let json: Option<String> = conn
57 .query_row(
58 "SELECT record_json FROM processes WHERE process_id = ?1",
59 params![process_id],
60 |row| row.get(0),
61 )
62 .optional()
63 .map_err(process_sqlite_error)?;
64 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
65 .transpose()
66 }
67
68 fn save_process_conn(
69 conn: &Connection,
70 record: &ProcessRecord,
71 ) -> Result<(), lash_core::PluginError> {
72 conn.execute(
73 "UPDATE processes
74 SET updated_at_ms = ?2, status = ?3, record_json = ?4
75 WHERE process_id = ?1",
76 params![
77 record.id.as_str(),
78 record.updated_at_ms as i64,
79 process_status_label(record),
80 process_encode_json(record)?
81 ],
82 )
83 .map_err(process_sqlite_error)?;
84 Ok(())
85 }
86
87 fn load_event_by_key_conn(
88 conn: &Connection,
89 process_id: &str,
90 replay_key: &str,
91 ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
92 let row: Option<(String, String)> = conn
93 .query_row(
94 "SELECT payload_hash, event_json
95 FROM process_events
96 WHERE process_id = ?1 AND idempotency_key = ?2",
97 params![process_id, replay_key],
98 |row| Ok((row.get(0)?, row.get(1)?)),
99 )
100 .optional()
101 .map_err(process_sqlite_error)?;
102 row.map(|(hash, json)| {
103 serde_json::from_str(&json)
104 .map(|event| (hash, event))
105 .map_err(process_decode_error)
106 })
107 .transpose()
108 }
109
110 fn load_process_lease_conn(
111 conn: &Connection,
112 process_id: &str,
113 ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
114 conn.query_row(
115 "SELECT lease_owner_id, lease_token, lease_fencing_token,
116 lease_claimed_at_ms, lease_expires_at_ms
117 FROM process_leases
118 WHERE process_id = ?1",
119 params![process_id],
120 |row| {
121 let owner_id: Option<String> = row.get(0)?;
122 let lease_token: Option<String> = row.get(1)?;
123 let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
124 return Ok(None);
125 };
126 Ok(Some(ProcessLease {
127 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
128 process_id: process_id.to_string(),
129 owner_id,
130 lease_token,
131 fencing_token: row.get::<_, i64>(2)? as u64,
132 claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
133 expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
134 }))
135 },
136 )
137 .optional()
138 .map(|lease| lease.flatten())
139 .map_err(process_sqlite_error)
140 }
141
142 fn list_grants_for_scope_conn(
143 conn: &Connection,
144 session_scope: &SessionScope,
145 live_only: bool,
146 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
147 let session_scope_id = session_scope.id();
148 let status_clause = if live_only {
149 "AND p.status = 'running'"
150 } else {
151 ""
152 };
153 let mut stmt = conn
154 .prepare(&format!(
155 "SELECT g.process_id, g.descriptor_json, p.record_json
156 FROM process_handle_grants g
157 JOIN processes p ON p.process_id = g.process_id
158 WHERE g.scope_id = ?1 {status_clause}
159 ORDER BY g.process_id ASC"
160 ))
161 .map_err(process_sqlite_error)?;
162 let rows = stmt
163 .query_map(params![session_scope_id.as_str()], |row| {
164 Ok((
165 row.get::<_, String>(0)?,
166 row.get::<_, String>(1)?,
167 row.get::<_, String>(2)?,
168 ))
169 })
170 .map_err(process_sqlite_error)?;
171 let mut entries = Vec::new();
172 for row in rows {
173 let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
174 let descriptor: ProcessHandleDescriptor =
175 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
176 let record: ProcessRecord =
177 serde_json::from_str(&record_json).map_err(process_decode_error)?;
178 entries.push((
179 ProcessHandleGrant {
180 session_id: session_scope.session_id.clone(),
181 process_id,
182 descriptor,
183 },
184 record,
185 ));
186 }
187 Ok(entries)
188 }
189}
190
191fn tx_outcome<T>(
196 result: Result<T, lash_core::PluginError>,
197) -> TxOutcome<Result<T, lash_core::PluginError>> {
198 match result {
199 Ok(value) => TxOutcome::Commit(Ok(value)),
200 Err(err) => TxOutcome::Rollback(Err(err)),
201 }
202}
203
204#[async_trait::async_trait]
205impl ProcessRegistry for SqliteProcessRegistry {
206 fn durability_tier(&self) -> DurabilityTier {
207 DurabilityTier::Durable
208 }
209
210 async fn register_process(
211 &self,
212 registration: ProcessRegistration,
213 ) -> Result<ProcessRecord, lash_core::PluginError> {
214 let (registration, registration_hash) = prepare_process_registration(registration)?;
215 let record = self
216 .conn
217 .write_flow(move |tx| {
218 Ok(tx_outcome((|| {
219 if let Some(existing) = Self::load_process_conn(tx, ®istration.id)? {
220 if existing.registration_hash == registration_hash {
221 return Ok(existing);
222 }
223 return Err(lash_core::PluginError::Session(format!(
224 "process `{}` registration hash conflict: existing {}, new {}",
225 registration.id, existing.registration_hash, registration_hash
226 )));
227 }
228 let now = current_epoch_ms();
229 let record = ProcessRecord::from_prepared_registration(
230 registration,
231 registration_hash,
232 now,
233 );
234 let originator_scope_id = record.originator_scope_id();
235 tx.execute(
236 "INSERT INTO processes (
237 process_id, registration_hash, owner_scope_id,
238 created_at_ms, updated_at_ms, status, record_json
239 )
240 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
241 params![
242 record.id.as_str(),
243 record.registration_hash.as_str(),
244 originator_scope_id.as_str(),
245 record.created_at_ms as i64,
246 record.updated_at_ms as i64,
247 process_status_label(&record),
248 process_encode_json(&record)?,
249 ],
250 )
251 .map_err(process_sqlite_error)?;
252 Ok(record)
253 })()))
254 })
255 .await
256 .map_err(process_sqlite_error)??;
257 self.notify.notify_waiters();
258 Ok(record)
259 }
260
261 async fn set_external_ref(
262 &self,
263 process_id: &str,
264 external_ref: ProcessExternalRef,
265 ) -> Result<ProcessRecord, lash_core::PluginError> {
266 let process_id = process_id.to_string();
267 let record = self
268 .conn
269 .write_flow(move |tx| {
270 Ok(tx_outcome((|| {
271 let mut record =
272 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
273 lash_core::PluginError::Session(format!(
274 "unknown process `{process_id}`"
275 ))
276 })?;
277 record.external_ref = Some(external_ref);
278 record.updated_at_ms = current_epoch_ms();
279 Self::save_process_conn(tx, &record)?;
280 Ok(record)
281 })()))
282 })
283 .await
284 .map_err(process_sqlite_error)??;
285 self.notify.notify_waiters();
286 Ok(record)
287 }
288
289 async fn grant_handle(
290 &self,
291 session_scope: &SessionScope,
292 process_id: &str,
293 descriptor: ProcessHandleDescriptor,
294 ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
295 let session_scope = session_scope.clone();
296 let process_id = process_id.to_string();
297 self.conn
298 .write_flow(move |tx| {
299 Ok(tx_outcome((|| {
300 let session_scope_id = session_scope.id();
301 if Self::load_process_conn(tx, &process_id)?.is_none() {
302 return Err(lash_core::PluginError::Session(format!(
303 "unknown process `{process_id}`"
304 )));
305 }
306 tx.execute(
307 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
308 VALUES (?1, ?2, ?3, ?4)
309 ON CONFLICT(scope_id, process_id) DO UPDATE SET
310 session_id = excluded.session_id,
311 descriptor_json = excluded.descriptor_json",
312 params![
313 session_scope.session_id.as_str(),
314 session_scope_id.as_str(),
315 process_id.as_str(),
316 process_encode_json(&descriptor)?
317 ],
318 )
319 .map_err(process_sqlite_error)?;
320 Ok(ProcessHandleGrant {
321 session_id: session_scope.session_id.clone(),
322 process_id: process_id.clone(),
323 descriptor,
324 })
325 })()))
326 })
327 .await
328 .map_err(process_sqlite_error)?
329 }
330
331 async fn revoke_handle(
332 &self,
333 session_scope: &SessionScope,
334 process_id: &str,
335 ) -> Result<(), lash_core::PluginError> {
336 let session_scope_id = session_scope.id().as_str().to_string();
337 let process_id = process_id.to_string();
338 self.conn
339 .call(move |conn| {
340 conn.execute(
341 "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
342 params![session_scope_id, process_id],
343 )
344 })
345 .await
346 .map_err(process_sqlite_error)?;
347 Ok(())
348 }
349
350 async fn transfer_handle_grants(
351 &self,
352 from_scope: &SessionScope,
353 to_scope: &SessionScope,
354 process_ids: &[String],
355 ) -> Result<(), lash_core::PluginError> {
356 let from_scope = from_scope.clone();
357 let to_scope = to_scope.clone();
358 let process_ids = process_ids.to_vec();
359 self.conn
360 .write_flow(move |tx| {
361 Ok(tx_outcome((|| {
362 let from_scope_id = from_scope.id();
363 let to_scope_id = to_scope.id();
364 for process_id in &process_ids {
365 let descriptor_json: Option<String> = tx
366 .query_row(
367 "SELECT descriptor_json
368 FROM process_handle_grants
369 WHERE scope_id = ?1 AND process_id = ?2",
370 params![from_scope_id.as_str(), process_id.as_str()],
371 |row| row.get(0),
372 )
373 .optional()
374 .map_err(process_sqlite_error)?;
375 let Some(descriptor_json) = descriptor_json else {
376 return Err(lash_core::PluginError::Session(format!(
377 "process handle `{process_id}` is not granted to session `{}`",
378 from_scope.session_id
379 )));
380 };
381 tx.execute(
382 "DELETE FROM process_handle_grants
383 WHERE scope_id = ?1 AND process_id = ?2",
384 params![from_scope_id.as_str(), process_id.as_str()],
385 )
386 .map_err(process_sqlite_error)?;
387 tx.execute(
388 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
389 VALUES (?1, ?2, ?3, ?4)
390 ON CONFLICT(scope_id, process_id) DO UPDATE SET
391 session_id = excluded.session_id,
392 descriptor_json = excluded.descriptor_json",
393 params![
394 to_scope.session_id.as_str(),
395 to_scope_id.as_str(),
396 process_id.as_str(),
397 descriptor_json
398 ],
399 )
400 .map_err(process_sqlite_error)?;
401 }
402 Ok(())
403 })()))
404 })
405 .await
406 .map_err(process_sqlite_error)?
407 }
408
409 async fn list_handle_grants(
410 &self,
411 session_scope: &SessionScope,
412 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
413 let session_scope = session_scope.clone();
414 self.conn
415 .call(move |conn| {
416 Ok(Self::list_grants_for_scope_conn(
417 conn,
418 &session_scope,
419 false,
420 ))
421 })
422 .await
423 .map_err(process_sqlite_error)?
424 }
425
426 async fn list_live_handle_grants(
427 &self,
428 session_scope: &SessionScope,
429 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
430 let session_scope = session_scope.clone();
431 self.conn
432 .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &session_scope, true)))
433 .await
434 .map_err(process_sqlite_error)?
435 }
436
437 async fn has_handle_grant(
438 &self,
439 session_scope: &SessionScope,
440 process_id: &str,
441 ) -> Result<bool, lash_core::PluginError> {
442 let session_scope_id = session_scope.id().as_str().to_string();
443 let process_id = process_id.to_string();
444 self.conn
445 .call(move |conn| {
446 let exists = conn
447 .query_row(
448 "SELECT 1
449 FROM process_handle_grants g
450 JOIN processes p ON p.process_id = g.process_id
451 WHERE g.scope_id = ?1 AND g.process_id = ?2
452 LIMIT 1",
453 params![session_scope_id, process_id],
454 |_| Ok(()),
455 )
456 .optional()?
457 .is_some();
458 Ok(exists)
459 })
460 .await
461 .map_err(process_sqlite_error)
462 }
463
464 async fn handle_grants_for_process(
465 &self,
466 process_id: &str,
467 ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
468 let process_id = process_id.to_string();
469 self.conn
470 .call(move |conn| {
471 Ok((|| {
472 if Self::load_process_conn(conn, &process_id)?.is_none() {
473 return Err(lash_core::PluginError::Session(format!(
474 "unknown process `{process_id}`"
475 )));
476 }
477 let mut stmt = conn
478 .prepare(
479 "SELECT session_id, descriptor_json
480 FROM process_handle_grants
481 WHERE process_id = ?1
482 ORDER BY session_id ASC, scope_id ASC",
483 )
484 .map_err(process_sqlite_error)?;
485 let rows = stmt
486 .query_map(params![process_id], |row| {
487 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
488 })
489 .map_err(process_sqlite_error)?;
490 let mut grants = Vec::new();
491 for row in rows {
492 let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
493 let descriptor: ProcessHandleDescriptor =
494 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
495 grants.push(ProcessHandleGrant {
496 session_id,
497 process_id: process_id.clone(),
498 descriptor,
499 });
500 }
501 Ok(grants)
502 })())
503 })
504 .await
505 .map_err(process_sqlite_error)?
506 }
507
508 async fn delete_session_process_state(
509 &self,
510 session_id: &str,
511 ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
512 let session_id_owned = session_id.to_string();
513 let (
514 revoked_handle_count,
515 deleted_wake_count,
516 mut orphaned_process_ids,
517 mut preserved_process_ids,
518 ) = self
519 .conn
520 .write_flow(move |tx| {
521 Ok(tx_outcome((|| {
522 let session_id = session_id_owned;
523 let removed = {
524 let mut stmt = tx
525 .prepare(
526 "SELECT g.process_id, p.record_json
527 FROM process_handle_grants g
528 JOIN processes p ON p.process_id = g.process_id
529 WHERE g.session_id = ?1
530 ORDER BY g.process_id ASC",
531 )
532 .map_err(process_sqlite_error)?;
533 let rows = stmt
534 .query_map(params![session_id], |row| {
535 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
536 })
537 .map_err(process_sqlite_error)?;
538 let mut removed = Vec::new();
539 for row in rows {
540 let (process_id, record_json) = row.map_err(process_sqlite_error)?;
541 let record: ProcessRecord =
542 serde_json::from_str(&record_json).map_err(process_decode_error)?;
543 removed.push((process_id, record));
544 }
545 removed
546 };
547
548 let deleted_wake_count = 0;
553 let revoked_handle_count = tx
554 .execute(
555 "DELETE FROM process_handle_grants WHERE session_id = ?1",
556 params![session_id],
557 )
558 .map_err(process_sqlite_error)?;
559 let mut orphaned_process_ids = Vec::new();
560 let mut preserved_process_ids = Vec::new();
561 for (process_id, record) in removed {
562 if record.is_terminal() {
563 continue;
564 }
565 let remaining_grants: i64 = tx
566 .query_row(
567 "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
568 params![process_id],
569 |row| row.get(0),
570 )
571 .map_err(process_sqlite_error)?;
572 if remaining_grants == 0 {
573 orphaned_process_ids.push(process_id);
574 } else {
575 preserved_process_ids.push(process_id);
576 }
577 }
578 let wake_targeted = {
579 let mut stmt = tx
580 .prepare("SELECT process_id, record_json FROM processes ORDER BY process_id ASC")
581 .map_err(process_sqlite_error)?;
582 let rows = stmt
583 .query_map([], |row| {
584 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
585 })
586 .map_err(process_sqlite_error)?;
587 let mut records = Vec::new();
588 for row in rows {
589 let (process_id, record_json) = row.map_err(process_sqlite_error)?;
590 let record: ProcessRecord =
591 serde_json::from_str(&record_json).map_err(process_decode_error)?;
592 records.push((process_id, record));
593 }
594 records
595 };
596 for (_process_id, mut record) in wake_targeted {
597 if record.clear_wake_target_for_session(&session_id) {
598 Self::save_process_conn(tx, &record)?;
599 }
600 }
601 Ok((
602 revoked_handle_count,
603 deleted_wake_count,
604 orphaned_process_ids,
605 preserved_process_ids,
606 ))
607 })()))
608 })
609 .await
610 .map_err(process_sqlite_error)??;
611 orphaned_process_ids.sort();
612 orphaned_process_ids.dedup();
613 preserved_process_ids.sort();
614 preserved_process_ids.dedup();
615 Ok(lash_core::ProcessSessionDeleteReport {
616 session_id: session_id.to_string(),
617 revoked_handle_count,
618 deleted_wake_count,
619 orphaned_process_ids,
620 preserved_process_ids,
621 })
622 }
623
624 async fn append_event(
625 &self,
626 process_id: &str,
627 request: ProcessEventAppendRequest,
628 ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
629 let process_id = process_id.to_string();
630 let (result, appended) = self
631 .conn
632 .write_flow(move |tx| {
633 Ok(tx_outcome((|| {
634 let mut record =
635 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
636 lash_core::PluginError::Session(format!(
637 "unknown process `{process_id}`"
638 ))
639 })?;
640 let replay_lookup = if let Some(replay_key) =
641 request.replay.as_ref().map(|replay| replay.key.as_str())
642 {
643 Self::load_event_by_key_conn(tx, &process_id, replay_key)?
644 } else {
645 None
646 };
647 let sequence = tx
648 .query_row(
649 "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
650 params![process_id],
651 |row| row.get::<_, i64>(0),
652 )
653 .map_err(process_sqlite_error)? as u64;
654 let occurred_at_ms = current_epoch_ms();
655 let prepared = prepare_process_event_append(
656 &record,
657 request,
658 sequence,
659 replay_lookup,
660 occurred_at_ms,
661 )?;
662 if prepared.replayed {
663 return Ok((
664 ProcessEventAppendResult {
665 event: prepared.event,
666 wake_delivery: prepared.wake_delivery,
667 },
668 false,
669 ));
670 }
671 let event = prepared.event;
672 tx.execute(
673 "INSERT INTO process_events (
674 process_id, sequence, event_type, payload_hash, idempotency_key,
675 occurred_at_ms, event_json
676 )
677 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
678 params![
679 process_id,
680 sequence as i64,
681 event.event_type.as_str(),
682 prepared.payload_hash.as_str(),
683 event.invocation.replay_key(),
684 prepared.occurred_at_ms as i64,
685 process_encode_json(&event)?,
686 ],
687 )
688 .map_err(process_sqlite_error)?;
689 if let Some(status) = prepared.status_update.clone() {
690 record.status = status;
691 if record.status.is_terminal() {
692 record.wait = None;
693 }
694 }
695 record.updated_at_ms = prepared.occurred_at_ms;
696 Self::save_process_conn(tx, &record)?;
697 Ok((
698 ProcessEventAppendResult {
699 event,
700 wake_delivery: prepared.wake_delivery,
701 },
702 true,
703 ))
704 })()))
705 })
706 .await
707 .map_err(process_sqlite_error)??;
708 if appended {
709 self.notify.notify_waiters();
710 }
711 Ok(result)
712 }
713
714 async fn events_after(
715 &self,
716 process_id: &str,
717 after_sequence: u64,
718 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
719 let process_id = process_id.to_string();
720 self.conn
721 .call(move |conn| {
722 Ok((|| {
723 if Self::load_process_conn(conn, &process_id)?.is_none() {
724 return Err(lash_core::PluginError::Session(format!(
725 "unknown process `{process_id}`"
726 )));
727 }
728 let mut stmt = conn
729 .prepare(
730 "SELECT event_json FROM process_events
731 WHERE process_id = ?1 AND sequence > ?2
732 ORDER BY sequence ASC",
733 )
734 .map_err(process_sqlite_error)?;
735 let rows = stmt
736 .query_map(params![process_id, after_sequence as i64], |row| {
737 row.get::<_, String>(0)
738 })
739 .map_err(process_sqlite_error)?;
740 let mut events = Vec::new();
741 for row in rows {
742 events.push(
743 serde_json::from_str(&row.map_err(process_sqlite_error)?)
744 .map_err(process_decode_error)?,
745 );
746 }
747 Ok(events)
748 })())
749 })
750 .await
751 .map_err(process_sqlite_error)?
752 }
753
754 async fn count_events_through(
755 &self,
756 process_id: &str,
757 event_type: &str,
758 up_to_sequence: u64,
759 ) -> Result<u64, lash_core::PluginError> {
760 let process_id = process_id.to_string();
761 let event_type = event_type.to_string();
762 self.conn
763 .call(move |conn| {
764 Ok((|| {
765 if Self::load_process_conn(conn, &process_id)?.is_none() {
766 return Err(lash_core::PluginError::Session(format!(
767 "unknown process `{process_id}`"
768 )));
769 }
770 conn.query_row(
771 "SELECT COUNT(*) FROM process_events
772 WHERE process_id = ?1 AND event_type = ?2 AND sequence <= ?3",
773 params![process_id, event_type, up_to_sequence as i64],
774 |row| row.get::<_, i64>(0),
775 )
776 .map(|count| count as u64)
777 .map_err(process_sqlite_error)
778 })())
779 })
780 .await
781 .map_err(process_sqlite_error)?
782 }
783
784 async fn recent_events(
785 &self,
786 process_id: &str,
787 limit: usize,
788 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
789 let process_id = process_id.to_string();
790 self.conn
791 .call(move |conn| {
792 Ok((|| {
793 if Self::load_process_conn(conn, &process_id)?.is_none() {
794 return Err(lash_core::PluginError::Session(format!(
795 "unknown process `{process_id}`"
796 )));
797 }
798 let mut stmt = conn
799 .prepare(
800 "SELECT event_json FROM process_events
801 WHERE process_id = ?1
802 ORDER BY sequence DESC
803 LIMIT ?2",
804 )
805 .map_err(process_sqlite_error)?;
806 let rows = stmt
807 .query_map(params![process_id, limit as i64], |row| {
808 row.get::<_, String>(0)
809 })
810 .map_err(process_sqlite_error)?;
811 let mut events: Vec<ProcessEvent> = Vec::new();
812 for row in rows {
813 events.push(
814 serde_json::from_str(&row.map_err(process_sqlite_error)?)
815 .map_err(process_decode_error)?,
816 );
817 }
818 events.reverse();
819 Ok(events)
820 })())
821 })
822 .await
823 .map_err(process_sqlite_error)?
824 }
825
826 async fn wake_events_after(
827 &self,
828 process_id: &str,
829 after_sequence: u64,
830 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
831 let acked: std::collections::HashSet<u64> = {
832 let process_id = process_id.to_string();
833 self.conn
834 .call(move |conn| {
835 Ok(
836 (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
837 let mut stmt = conn
838 .prepare(
839 "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
840 )
841 .map_err(process_sqlite_error)?;
842 let rows = stmt
843 .query_map(params![process_id], |row| row.get::<_, i64>(0))
844 .map_err(process_sqlite_error)?;
845 let mut set = std::collections::HashSet::new();
846 for row in rows {
847 set.insert(row.map_err(process_sqlite_error)? as u64);
848 }
849 Ok(set)
850 })(),
851 )
852 })
853 .await
854 .map_err(process_sqlite_error)??
855 };
856 Ok(self
857 .events_after(process_id, after_sequence)
858 .await?
859 .into_iter()
860 .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
861 .collect())
862 }
863
864 async fn wait_event_after(
865 &self,
866 process_id: &str,
867 event_type: &str,
868 after_sequence: u64,
869 ) -> Result<ProcessEvent, lash_core::PluginError> {
870 loop {
871 if let Some(event) = self
872 .events_after(process_id, after_sequence)
873 .await?
874 .into_iter()
875 .find(|event| event.event_type == event_type)
876 {
877 return Ok(event);
878 }
879 tokio::select! {
880 _ = self.notify.notified() => {}
881 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
882 }
883 }
884 }
885
886 async fn await_process(
887 &self,
888 process_id: &str,
889 ) -> Result<ProcessAwaitOutput, lash_core::PluginError> {
890 loop {
891 let record = self.get_process(process_id).await.ok_or_else(|| {
892 lash_core::PluginError::Session(format!("unknown process `{process_id}`"))
893 })?;
894 if let Some(await_output) = record.status.await_output() {
895 return Ok(await_output.clone());
896 }
897 tokio::select! {
898 _ = self.notify.notified() => {}
899 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
900 }
901 }
902 }
903
904 async fn complete_process(
905 &self,
906 process_id: &str,
907 await_output: ProcessAwaitOutput,
908 ) -> Result<ProcessRecord, lash_core::PluginError> {
909 let event_type = match await_output.terminal_state() {
910 lash_core::ProcessTerminalState::Completed => "process.completed",
911 lash_core::ProcessTerminalState::Failed => "process.failed",
912 lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
913 };
914 self.append_event(
915 process_id,
916 ProcessEventAppendRequest::new(
917 event_type,
918 serde_json::json!({ "await_output": await_output }),
919 )
920 .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
921 )
922 .await?;
923 self.get_process(process_id).await.ok_or_else(|| {
924 lash_core::PluginError::Session(format!(
925 "unknown process `{process_id}` after terminal event"
926 ))
927 })
928 }
929
930 async fn set_process_wait(
931 &self,
932 process_id: &str,
933 wait: lash_core::WaitState,
934 ) -> Result<ProcessRecord, lash_core::PluginError> {
935 let process_id = process_id.to_string();
936 self.conn
937 .write_flow(move |tx| {
938 Ok(tx_outcome((|| {
939 let mut record =
940 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
941 lash_core::PluginError::Session(format!(
942 "unknown process `{process_id}`"
943 ))
944 })?;
945 if record.is_terminal() {
946 return Err(lash_core::PluginError::Session(format!(
947 "terminal process `{process_id}` cannot enter a wait state"
948 )));
949 }
950 record.wait = Some(wait);
951 record.updated_at_ms = current_epoch_ms();
952 Self::save_process_conn(tx, &record)?;
953 Ok(record)
954 })()))
955 })
956 .await
957 .map_err(process_sqlite_error)?
958 }
959
960 async fn clear_process_wait(
961 &self,
962 process_id: &str,
963 ) -> Result<ProcessRecord, lash_core::PluginError> {
964 let process_id = process_id.to_string();
965 self.conn
966 .write_flow(move |tx| {
967 Ok(tx_outcome((|| {
968 let mut record =
969 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
970 lash_core::PluginError::Session(format!(
971 "unknown process `{process_id}`"
972 ))
973 })?;
974 record.wait = None;
975 record.updated_at_ms = current_epoch_ms();
976 Self::save_process_conn(tx, &record)?;
977 Ok(record)
978 })()))
979 })
980 .await
981 .map_err(process_sqlite_error)?
982 }
983
984 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
985 let process_id = process_id.to_string();
986 self.conn
987 .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
988 .await
989 .ok()
990 .flatten()
991 }
992
993 async fn list_processes(
994 &self,
995 filter: &lash_core::ProcessListFilter,
996 ) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
997 let filter = filter.clone();
998 self.conn
999 .call(move |conn| {
1000 Ok((|| {
1001 let mut stmt = conn
1002 .prepare(
1003 "SELECT record_json FROM processes
1004 ORDER BY process_id ASC",
1005 )
1006 .map_err(process_sqlite_error)?;
1007 let rows = stmt
1008 .query_map([], |row| row.get::<_, String>(0))
1009 .map_err(process_sqlite_error)?;
1010 let mut records = Vec::new();
1011 for row in rows {
1012 let record: ProcessRecord =
1013 serde_json::from_str(&row.map_err(process_sqlite_error)?)
1014 .map_err(process_decode_error)?;
1015 if filter.matches_record(&record) {
1016 records.push(record);
1017 }
1018 }
1019 Ok(records)
1020 })())
1021 })
1022 .await
1023 .map_err(process_sqlite_error)?
1024 }
1025
1026 async fn ack_wake(
1027 &self,
1028 process_id: &str,
1029 sequence: u64,
1030 ) -> Result<(), lash_core::PluginError> {
1031 let process_id = process_id.to_string();
1032 self.conn
1033 .call(move |conn| {
1034 Ok((|| {
1035 if Self::load_process_conn(conn, &process_id)?.is_none() {
1036 return Err(lash_core::PluginError::Session(format!(
1037 "unknown process `{process_id}`"
1038 )));
1039 }
1040 conn.execute(
1041 "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
1042 params![process_id, sequence as i64],
1043 )
1044 .map_err(process_sqlite_error)?;
1045 Ok(())
1046 })())
1047 })
1048 .await
1049 .map_err(process_sqlite_error)?
1050 }
1051
1052 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1053 self.conn
1054 .call(move |conn| {
1055 Ok((|| {
1056 let mut stmt = conn
1057 .prepare(
1058 "SELECT record_json FROM processes
1059 WHERE status = 'running'
1060 ORDER BY process_id ASC",
1061 )
1062 .map_err(process_sqlite_error)?;
1063 let rows = stmt
1064 .query_map([], |row| row.get::<_, String>(0))
1065 .map_err(process_sqlite_error)?;
1066 let mut records = Vec::new();
1067 for row in rows {
1068 let record: ProcessRecord =
1069 serde_json::from_str(&row.map_err(process_sqlite_error)?)
1070 .map_err(process_decode_error)?;
1071 records.push(record);
1072 }
1073 Ok(records)
1074 })())
1075 })
1076 .await
1077 .map_err(process_sqlite_error)?
1078 }
1079
1080 async fn claim_process_lease(
1081 &self,
1082 process_id: &str,
1083 owner_id: &str,
1084 lease_ttl_ms: u64,
1085 ) -> Result<ProcessLease, lash_core::PluginError> {
1086 let process_id = process_id.to_string();
1087 let owner_id = owner_id.to_string();
1088 self.conn
1089 .write_flow(move |tx| {
1090 Ok(tx_outcome((|| {
1091 if Self::load_process_conn(tx, &process_id)?.is_none() {
1092 return Err(lash_core::PluginError::Session(format!(
1093 "unknown process `{process_id}`"
1094 )));
1095 }
1096 let now = current_epoch_ms();
1097 let current = Self::load_process_lease_conn(tx, &process_id)?;
1098 if let Some(current) = current.as_ref()
1099 && current.expires_at_epoch_ms > now
1100 && current.owner_id != owner_id
1101 {
1102 return Err(process_lease_conflict(&process_id, current));
1103 }
1104 let fencing_token: u64 = tx
1109 .query_row(
1110 "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1111 params![process_id],
1112 |row| row.get::<_, i64>(0),
1113 )
1114 .optional()
1115 .map_err(process_sqlite_error)?
1116 .unwrap_or(0) as u64
1117 + 1;
1118 let lease = ProcessLease {
1119 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1120 process_id: process_id.clone(),
1121 owner_id: owner_id.clone(),
1122 lease_token: format!(
1123 "{:x}",
1124 Sha256::digest(
1125 format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes()
1126 )
1127 ),
1128 fencing_token,
1129 claimed_at_epoch_ms: now,
1130 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1131 };
1132 tx.execute(
1133 "INSERT INTO process_leases (
1134 process_id, lease_owner_id, lease_token, lease_fencing_token,
1135 lease_claimed_at_ms, lease_expires_at_ms
1136 )
1137 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1138 ON CONFLICT(process_id) DO UPDATE SET
1139 lease_owner_id = excluded.lease_owner_id,
1140 lease_token = excluded.lease_token,
1141 lease_fencing_token = excluded.lease_fencing_token,
1142 lease_claimed_at_ms = excluded.lease_claimed_at_ms,
1143 lease_expires_at_ms = excluded.lease_expires_at_ms",
1144 params![
1145 lease.process_id.as_str(),
1146 lease.owner_id.as_str(),
1147 lease.lease_token.as_str(),
1148 lease.fencing_token as i64,
1149 lease.claimed_at_epoch_ms as i64,
1150 lease.expires_at_epoch_ms as i64,
1151 ],
1152 )
1153 .map_err(process_sqlite_error)?;
1154 Ok(lease)
1155 })()))
1156 })
1157 .await
1158 .map_err(process_sqlite_error)?
1159 }
1160
1161 async fn renew_process_lease(
1162 &self,
1163 lease: &ProcessLease,
1164 lease_ttl_ms: u64,
1165 ) -> Result<ProcessLease, lash_core::PluginError> {
1166 let lease = lease.clone();
1167 self.conn
1168 .write_flow(move |tx| {
1169 Ok(tx_outcome((|| {
1170 let now = current_epoch_ms();
1171 let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
1172 if !guard_lease(current.as_ref(), &lease.lease_token, now) {
1173 return Err(process_lease_expired(&lease.process_id));
1174 }
1175 let renewed = ProcessLease {
1176 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1177 ..lease.clone()
1178 };
1179 tx.execute(
1180 "UPDATE process_leases
1181 SET lease_expires_at_ms = ?2
1182 WHERE process_id = ?1 AND lease_token = ?3",
1183 params![
1184 renewed.process_id.as_str(),
1185 renewed.expires_at_epoch_ms as i64,
1186 renewed.lease_token.as_str(),
1187 ],
1188 )
1189 .map_err(process_sqlite_error)?;
1190 Ok(renewed)
1191 })()))
1192 })
1193 .await
1194 .map_err(process_sqlite_error)?
1195 }
1196
1197 async fn complete_process_lease(
1198 &self,
1199 completion: &ProcessLeaseCompletion,
1200 ) -> Result<(), lash_core::PluginError> {
1201 let process_id = completion.process_id.clone();
1202 let lease_token = completion.lease_token.clone();
1203 self.conn
1204 .call(move |conn| {
1205 conn.execute(
1206 "UPDATE process_leases
1207 SET lease_owner_id = NULL,
1208 lease_token = NULL,
1209 lease_claimed_at_ms = 0,
1210 lease_expires_at_ms = 0
1211 WHERE process_id = ?1 AND lease_token = ?2",
1212 params![process_id, lease_token],
1213 )
1214 })
1215 .await
1216 .map_err(process_sqlite_error)?;
1217 Ok(())
1218 }
1219}
1220
1221fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> lash_core::PluginError {
1224 lash_core::PluginError::Session(format!(
1225 "process `{process_id}` is already leased by `{}` until {}",
1226 current.owner_id, current.expires_at_epoch_ms
1227 ))
1228}
1229
1230fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1232 lash_core::PluginError::Session(format!(
1233 "process lease for `{process_id}` is missing or expired"
1234 ))
1235}