1use super::*;
26
27fn process_status_label(record: &ProcessRecord) -> &'static str {
28 record.status.label()
29}
30
31impl SqliteProcessRegistry {
32 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
33 let conn = SqliteConnection::open(path).await?;
34 ensure_process_schema(&conn).await?;
35 apply_pragmas(&conn, StoreBacking::File).await?;
36 Ok(Self {
37 conn,
38 notify: tokio::sync::Notify::new(),
39 })
40 }
41
42 pub async fn memory() -> tokio_rusqlite::Result<Self> {
43 let conn = SqliteConnection::open_in_memory().await?;
44 ensure_process_schema(&conn).await?;
45 apply_pragmas(&conn, StoreBacking::Memory).await?;
46 Ok(Self {
47 conn,
48 notify: tokio::sync::Notify::new(),
49 })
50 }
51
52 fn load_process_conn(
53 conn: &Connection,
54 process_id: &str,
55 ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
56 let json: Option<String> = conn
57 .query_row(
58 "SELECT record_json FROM processes WHERE process_id = ?1",
59 params![process_id],
60 |row| row.get(0),
61 )
62 .optional()
63 .map_err(process_sqlite_error)?;
64 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
65 .transpose()
66 }
67
68 fn save_process_conn(
69 conn: &Connection,
70 record: &ProcessRecord,
71 ) -> Result<(), lash_core::PluginError> {
72 conn.execute(
73 "UPDATE processes
74 SET updated_at_ms = ?2, status = ?3, record_json = ?4
75 WHERE process_id = ?1",
76 params![
77 record.id.as_str(),
78 record.updated_at_ms as i64,
79 process_status_label(record),
80 process_encode_json(record)?
81 ],
82 )
83 .map_err(process_sqlite_error)?;
84 Ok(())
85 }
86
87 fn load_event_by_key_conn(
88 conn: &Connection,
89 process_id: &str,
90 replay_key: &str,
91 ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
92 let row: Option<(String, String)> = conn
93 .query_row(
94 "SELECT payload_hash, event_json
95 FROM process_events
96 WHERE process_id = ?1 AND idempotency_key = ?2",
97 params![process_id, replay_key],
98 |row| Ok((row.get(0)?, row.get(1)?)),
99 )
100 .optional()
101 .map_err(process_sqlite_error)?;
102 row.map(|(hash, json)| {
103 serde_json::from_str(&json)
104 .map(|event| (hash, event))
105 .map_err(process_decode_error)
106 })
107 .transpose()
108 }
109
110 fn load_process_lease_conn(
111 conn: &Connection,
112 process_id: &str,
113 ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
114 conn.query_row(
115 "SELECT lease_owner_id, lease_token, lease_fencing_token,
116 lease_claimed_at_ms, lease_expires_at_ms
117 FROM process_leases
118 WHERE process_id = ?1",
119 params![process_id],
120 |row| {
121 let owner_id: Option<String> = row.get(0)?;
122 let lease_token: Option<String> = row.get(1)?;
123 let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
124 return Ok(None);
125 };
126 Ok(Some(ProcessLease {
127 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
128 process_id: process_id.to_string(),
129 owner_id,
130 lease_token,
131 fencing_token: row.get::<_, i64>(2)? as u64,
132 claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
133 expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
134 }))
135 },
136 )
137 .optional()
138 .map(|lease| lease.flatten())
139 .map_err(process_sqlite_error)
140 }
141
142 fn list_grants_for_scope_conn(
143 conn: &Connection,
144 session_scope: &SessionScope,
145 live_only: bool,
146 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
147 let session_scope_id = session_scope.id();
148 let status_clause = if live_only {
149 "AND p.status = 'running'"
150 } else {
151 ""
152 };
153 let mut stmt = conn
154 .prepare(&format!(
155 "SELECT g.process_id, g.descriptor_json, p.record_json
156 FROM process_handle_grants g
157 JOIN processes p ON p.process_id = g.process_id
158 WHERE g.scope_id = ?1 {status_clause}
159 ORDER BY g.process_id ASC"
160 ))
161 .map_err(process_sqlite_error)?;
162 let rows = stmt
163 .query_map(params![session_scope_id.as_str()], |row| {
164 Ok((
165 row.get::<_, String>(0)?,
166 row.get::<_, String>(1)?,
167 row.get::<_, String>(2)?,
168 ))
169 })
170 .map_err(process_sqlite_error)?;
171 let mut entries = Vec::new();
172 for row in rows {
173 let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
174 let descriptor: ProcessHandleDescriptor =
175 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
176 let record: ProcessRecord =
177 serde_json::from_str(&record_json).map_err(process_decode_error)?;
178 entries.push((
179 ProcessHandleGrant {
180 session_id: session_scope.session_id.clone(),
181 process_id,
182 descriptor,
183 },
184 record,
185 ));
186 }
187 Ok(entries)
188 }
189}
190
191fn tx_outcome<T>(
196 result: Result<T, lash_core::PluginError>,
197) -> TxOutcome<Result<T, lash_core::PluginError>> {
198 match result {
199 Ok(value) => TxOutcome::Commit(Ok(value)),
200 Err(err) => TxOutcome::Rollback(Err(err)),
201 }
202}
203
204#[async_trait::async_trait]
205impl ProcessRegistry for SqliteProcessRegistry {
206 fn durability_tier(&self) -> DurabilityTier {
207 DurabilityTier::Durable
208 }
209
210 async fn register_process(
211 &self,
212 registration: ProcessRegistration,
213 ) -> Result<ProcessRecord, lash_core::PluginError> {
214 let (registration, registration_hash) = prepare_process_registration(registration)?;
215 let record = self
216 .conn
217 .write_flow(move |tx| {
218 Ok(tx_outcome((|| {
219 if let Some(existing) = Self::load_process_conn(tx, ®istration.id)? {
220 if existing.registration_hash == registration_hash {
221 return Ok(existing);
222 }
223 return Err(lash_core::PluginError::Session(format!(
224 "process `{}` registration hash conflict: existing {}, new {}",
225 registration.id, existing.registration_hash, registration_hash
226 )));
227 }
228 let now = current_epoch_ms();
229 let record = ProcessRecord::from_prepared_registration(
230 registration,
231 registration_hash,
232 now,
233 );
234 let originator_scope_id = record.originator_scope_id();
235 tx.execute(
236 "INSERT INTO processes (
237 process_id, registration_hash, owner_scope_id, host_profile_id,
238 created_at_ms, updated_at_ms, status, record_json
239 )
240 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
241 params![
242 record.id.as_str(),
243 record.registration_hash.as_str(),
244 originator_scope_id.as_str(),
245 record.host_profile_id(),
246 record.created_at_ms as i64,
247 record.updated_at_ms as i64,
248 process_status_label(&record),
249 process_encode_json(&record)?,
250 ],
251 )
252 .map_err(process_sqlite_error)?;
253 Ok(record)
254 })()))
255 })
256 .await
257 .map_err(process_sqlite_error)??;
258 self.notify.notify_waiters();
259 Ok(record)
260 }
261
262 async fn set_external_ref(
263 &self,
264 process_id: &str,
265 external_ref: ProcessExternalRef,
266 ) -> Result<ProcessRecord, lash_core::PluginError> {
267 let process_id = process_id.to_string();
268 let record = self
269 .conn
270 .write_flow(move |tx| {
271 Ok(tx_outcome((|| {
272 let mut record =
273 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
274 lash_core::PluginError::Session(format!(
275 "unknown process `{process_id}`"
276 ))
277 })?;
278 record.external_ref = Some(external_ref);
279 record.updated_at_ms = current_epoch_ms();
280 Self::save_process_conn(tx, &record)?;
281 Ok(record)
282 })()))
283 })
284 .await
285 .map_err(process_sqlite_error)??;
286 self.notify.notify_waiters();
287 Ok(record)
288 }
289
290 async fn grant_handle(
291 &self,
292 session_scope: &SessionScope,
293 process_id: &str,
294 descriptor: ProcessHandleDescriptor,
295 ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
296 let session_scope = session_scope.clone();
297 let process_id = process_id.to_string();
298 self.conn
299 .write_flow(move |tx| {
300 Ok(tx_outcome((|| {
301 let session_scope_id = session_scope.id();
302 if Self::load_process_conn(tx, &process_id)?.is_none() {
303 return Err(lash_core::PluginError::Session(format!(
304 "unknown process `{process_id}`"
305 )));
306 }
307 tx.execute(
308 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
309 VALUES (?1, ?2, ?3, ?4)
310 ON CONFLICT(scope_id, process_id) DO UPDATE SET
311 session_id = excluded.session_id,
312 descriptor_json = excluded.descriptor_json",
313 params![
314 session_scope.session_id.as_str(),
315 session_scope_id.as_str(),
316 process_id.as_str(),
317 process_encode_json(&descriptor)?
318 ],
319 )
320 .map_err(process_sqlite_error)?;
321 Ok(ProcessHandleGrant {
322 session_id: session_scope.session_id.clone(),
323 process_id: process_id.clone(),
324 descriptor,
325 })
326 })()))
327 })
328 .await
329 .map_err(process_sqlite_error)?
330 }
331
332 async fn revoke_handle(
333 &self,
334 session_scope: &SessionScope,
335 process_id: &str,
336 ) -> Result<(), lash_core::PluginError> {
337 let session_scope_id = session_scope.id().as_str().to_string();
338 let process_id = process_id.to_string();
339 self.conn
340 .call(move |conn| {
341 conn.execute(
342 "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
343 params![session_scope_id, process_id],
344 )
345 })
346 .await
347 .map_err(process_sqlite_error)?;
348 Ok(())
349 }
350
351 async fn transfer_handle_grants(
352 &self,
353 from_scope: &SessionScope,
354 to_scope: &SessionScope,
355 process_ids: &[String],
356 ) -> Result<(), lash_core::PluginError> {
357 let from_scope = from_scope.clone();
358 let to_scope = to_scope.clone();
359 let process_ids = process_ids.to_vec();
360 self.conn
361 .write_flow(move |tx| {
362 Ok(tx_outcome((|| {
363 let from_scope_id = from_scope.id();
364 let to_scope_id = to_scope.id();
365 for process_id in &process_ids {
366 let descriptor_json: Option<String> = tx
367 .query_row(
368 "SELECT descriptor_json
369 FROM process_handle_grants
370 WHERE scope_id = ?1 AND process_id = ?2",
371 params![from_scope_id.as_str(), process_id.as_str()],
372 |row| row.get(0),
373 )
374 .optional()
375 .map_err(process_sqlite_error)?;
376 let Some(descriptor_json) = descriptor_json else {
377 return Err(lash_core::PluginError::Session(format!(
378 "process handle `{process_id}` is not granted to session `{}`",
379 from_scope.session_id
380 )));
381 };
382 tx.execute(
383 "DELETE FROM process_handle_grants
384 WHERE scope_id = ?1 AND process_id = ?2",
385 params![from_scope_id.as_str(), process_id.as_str()],
386 )
387 .map_err(process_sqlite_error)?;
388 tx.execute(
389 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
390 VALUES (?1, ?2, ?3, ?4)
391 ON CONFLICT(scope_id, process_id) DO UPDATE SET
392 session_id = excluded.session_id,
393 descriptor_json = excluded.descriptor_json",
394 params![
395 to_scope.session_id.as_str(),
396 to_scope_id.as_str(),
397 process_id.as_str(),
398 descriptor_json
399 ],
400 )
401 .map_err(process_sqlite_error)?;
402 }
403 Ok(())
404 })()))
405 })
406 .await
407 .map_err(process_sqlite_error)?
408 }
409
410 async fn list_handle_grants(
411 &self,
412 session_scope: &SessionScope,
413 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
414 let session_scope = session_scope.clone();
415 self.conn
416 .call(move |conn| {
417 Ok(Self::list_grants_for_scope_conn(
418 conn,
419 &session_scope,
420 false,
421 ))
422 })
423 .await
424 .map_err(process_sqlite_error)?
425 }
426
427 async fn list_live_handle_grants(
428 &self,
429 session_scope: &SessionScope,
430 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
431 let session_scope = session_scope.clone();
432 self.conn
433 .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &session_scope, true)))
434 .await
435 .map_err(process_sqlite_error)?
436 }
437
438 async fn has_handle_grant(
439 &self,
440 session_scope: &SessionScope,
441 process_id: &str,
442 ) -> Result<bool, lash_core::PluginError> {
443 let session_scope_id = session_scope.id().as_str().to_string();
444 let process_id = process_id.to_string();
445 self.conn
446 .call(move |conn| {
447 let exists = conn
448 .query_row(
449 "SELECT 1
450 FROM process_handle_grants g
451 JOIN processes p ON p.process_id = g.process_id
452 WHERE g.scope_id = ?1 AND g.process_id = ?2
453 LIMIT 1",
454 params![session_scope_id, process_id],
455 |_| Ok(()),
456 )
457 .optional()?
458 .is_some();
459 Ok(exists)
460 })
461 .await
462 .map_err(process_sqlite_error)
463 }
464
465 async fn handle_grants_for_process(
466 &self,
467 process_id: &str,
468 ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
469 let process_id = process_id.to_string();
470 self.conn
471 .call(move |conn| {
472 Ok((|| {
473 if Self::load_process_conn(conn, &process_id)?.is_none() {
474 return Err(lash_core::PluginError::Session(format!(
475 "unknown process `{process_id}`"
476 )));
477 }
478 let mut stmt = conn
479 .prepare(
480 "SELECT session_id, descriptor_json
481 FROM process_handle_grants
482 WHERE process_id = ?1
483 ORDER BY session_id ASC, scope_id ASC",
484 )
485 .map_err(process_sqlite_error)?;
486 let rows = stmt
487 .query_map(params![process_id], |row| {
488 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
489 })
490 .map_err(process_sqlite_error)?;
491 let mut grants = Vec::new();
492 for row in rows {
493 let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
494 let descriptor: ProcessHandleDescriptor =
495 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
496 grants.push(ProcessHandleGrant {
497 session_id,
498 process_id: process_id.clone(),
499 descriptor,
500 });
501 }
502 Ok(grants)
503 })())
504 })
505 .await
506 .map_err(process_sqlite_error)?
507 }
508
509 async fn delete_session_process_state(
510 &self,
511 session_id: &str,
512 ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
513 let session_id_owned = session_id.to_string();
514 let (
515 revoked_handle_count,
516 deleted_wake_count,
517 mut orphaned_process_ids,
518 mut preserved_process_ids,
519 ) = self
520 .conn
521 .write_flow(move |tx| {
522 Ok(tx_outcome((|| {
523 let session_id = session_id_owned;
524 let removed = {
525 let mut stmt = tx
526 .prepare(
527 "SELECT g.process_id, p.record_json
528 FROM process_handle_grants g
529 JOIN processes p ON p.process_id = g.process_id
530 WHERE g.session_id = ?1
531 ORDER BY g.process_id ASC",
532 )
533 .map_err(process_sqlite_error)?;
534 let rows = stmt
535 .query_map(params![session_id], |row| {
536 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
537 })
538 .map_err(process_sqlite_error)?;
539 let mut removed = Vec::new();
540 for row in rows {
541 let (process_id, record_json) = row.map_err(process_sqlite_error)?;
542 let record: ProcessRecord =
543 serde_json::from_str(&record_json).map_err(process_decode_error)?;
544 removed.push((process_id, record));
545 }
546 removed
547 };
548
549 let deleted_wake_count = 0;
554 let revoked_handle_count = tx
555 .execute(
556 "DELETE FROM process_handle_grants WHERE session_id = ?1",
557 params![session_id],
558 )
559 .map_err(process_sqlite_error)?;
560 let mut orphaned_process_ids = Vec::new();
561 let mut preserved_process_ids = Vec::new();
562 for (process_id, record) in removed {
563 if record.is_terminal() {
564 continue;
565 }
566 let remaining_grants: i64 = tx
567 .query_row(
568 "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
569 params![process_id],
570 |row| row.get(0),
571 )
572 .map_err(process_sqlite_error)?;
573 if remaining_grants == 0 {
574 orphaned_process_ids.push(process_id);
575 } else {
576 preserved_process_ids.push(process_id);
577 }
578 }
579 Ok((
580 revoked_handle_count,
581 deleted_wake_count,
582 orphaned_process_ids,
583 preserved_process_ids,
584 ))
585 })()))
586 })
587 .await
588 .map_err(process_sqlite_error)??;
589 orphaned_process_ids.sort();
590 orphaned_process_ids.dedup();
591 preserved_process_ids.sort();
592 preserved_process_ids.dedup();
593 Ok(lash_core::ProcessSessionDeleteReport {
594 session_id: session_id.to_string(),
595 revoked_handle_count,
596 deleted_wake_count,
597 orphaned_process_ids,
598 preserved_process_ids,
599 })
600 }
601
602 async fn append_event(
603 &self,
604 process_id: &str,
605 request: ProcessEventAppendRequest,
606 ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
607 let process_id = process_id.to_string();
608 let (result, appended) = self
609 .conn
610 .write_flow(move |tx| {
611 Ok(tx_outcome((|| {
612 let mut record =
613 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
614 lash_core::PluginError::Session(format!(
615 "unknown process `{process_id}`"
616 ))
617 })?;
618 let replay_lookup = if let Some(replay_key) =
619 request.replay.as_ref().map(|replay| replay.key.as_str())
620 {
621 Self::load_event_by_key_conn(tx, &process_id, replay_key)?
622 } else {
623 None
624 };
625 let sequence = tx
626 .query_row(
627 "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
628 params![process_id],
629 |row| row.get::<_, i64>(0),
630 )
631 .map_err(process_sqlite_error)? as u64;
632 let occurred_at_ms = current_epoch_ms();
633 let prepared = prepare_process_event_append(
634 &record,
635 request,
636 sequence,
637 replay_lookup,
638 occurred_at_ms,
639 )?;
640 if prepared.replayed {
641 return Ok((
642 ProcessEventAppendResult {
643 event: prepared.event,
644 wake_delivery: prepared.wake_delivery,
645 },
646 false,
647 ));
648 }
649 let event = prepared.event;
650 tx.execute(
651 "INSERT INTO process_events (
652 process_id, sequence, event_type, payload_hash, idempotency_key,
653 occurred_at_ms, event_json
654 )
655 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
656 params![
657 process_id,
658 sequence as i64,
659 event.event_type.as_str(),
660 prepared.payload_hash.as_str(),
661 event.invocation.replay_key(),
662 prepared.occurred_at_ms as i64,
663 process_encode_json(&event)?,
664 ],
665 )
666 .map_err(process_sqlite_error)?;
667 if let Some(status) = prepared.status_update.clone() {
668 record.status = status;
669 if record.status.is_terminal() {
670 record.wait = None;
671 }
672 }
673 record.updated_at_ms = prepared.occurred_at_ms;
674 Self::save_process_conn(tx, &record)?;
675 Ok((
676 ProcessEventAppendResult {
677 event,
678 wake_delivery: prepared.wake_delivery,
679 },
680 true,
681 ))
682 })()))
683 })
684 .await
685 .map_err(process_sqlite_error)??;
686 if appended {
687 self.notify.notify_waiters();
688 }
689 Ok(result)
690 }
691
692 async fn events_after(
693 &self,
694 process_id: &str,
695 after_sequence: u64,
696 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
697 let process_id = process_id.to_string();
698 self.conn
699 .call(move |conn| {
700 Ok((|| {
701 if Self::load_process_conn(conn, &process_id)?.is_none() {
702 return Err(lash_core::PluginError::Session(format!(
703 "unknown process `{process_id}`"
704 )));
705 }
706 let mut stmt = conn
707 .prepare(
708 "SELECT event_json FROM process_events
709 WHERE process_id = ?1 AND sequence > ?2
710 ORDER BY sequence ASC",
711 )
712 .map_err(process_sqlite_error)?;
713 let rows = stmt
714 .query_map(params![process_id, after_sequence as i64], |row| {
715 row.get::<_, String>(0)
716 })
717 .map_err(process_sqlite_error)?;
718 let mut events = Vec::new();
719 for row in rows {
720 events.push(
721 serde_json::from_str(&row.map_err(process_sqlite_error)?)
722 .map_err(process_decode_error)?,
723 );
724 }
725 Ok(events)
726 })())
727 })
728 .await
729 .map_err(process_sqlite_error)?
730 }
731
732 async fn count_events_through(
733 &self,
734 process_id: &str,
735 event_type: &str,
736 up_to_sequence: u64,
737 ) -> Result<u64, lash_core::PluginError> {
738 let process_id = process_id.to_string();
739 let event_type = event_type.to_string();
740 self.conn
741 .call(move |conn| {
742 Ok((|| {
743 if Self::load_process_conn(conn, &process_id)?.is_none() {
744 return Err(lash_core::PluginError::Session(format!(
745 "unknown process `{process_id}`"
746 )));
747 }
748 conn.query_row(
749 "SELECT COUNT(*) FROM process_events
750 WHERE process_id = ?1 AND event_type = ?2 AND sequence <= ?3",
751 params![process_id, event_type, up_to_sequence as i64],
752 |row| row.get::<_, i64>(0),
753 )
754 .map(|count| count as u64)
755 .map_err(process_sqlite_error)
756 })())
757 })
758 .await
759 .map_err(process_sqlite_error)?
760 }
761
762 async fn recent_events(
763 &self,
764 process_id: &str,
765 limit: usize,
766 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
767 let process_id = process_id.to_string();
768 self.conn
769 .call(move |conn| {
770 Ok((|| {
771 if Self::load_process_conn(conn, &process_id)?.is_none() {
772 return Err(lash_core::PluginError::Session(format!(
773 "unknown process `{process_id}`"
774 )));
775 }
776 let mut stmt = conn
777 .prepare(
778 "SELECT event_json FROM process_events
779 WHERE process_id = ?1
780 ORDER BY sequence DESC
781 LIMIT ?2",
782 )
783 .map_err(process_sqlite_error)?;
784 let rows = stmt
785 .query_map(params![process_id, limit as i64], |row| {
786 row.get::<_, String>(0)
787 })
788 .map_err(process_sqlite_error)?;
789 let mut events: Vec<ProcessEvent> = Vec::new();
790 for row in rows {
791 events.push(
792 serde_json::from_str(&row.map_err(process_sqlite_error)?)
793 .map_err(process_decode_error)?,
794 );
795 }
796 events.reverse();
797 Ok(events)
798 })())
799 })
800 .await
801 .map_err(process_sqlite_error)?
802 }
803
804 async fn wake_events_after(
805 &self,
806 process_id: &str,
807 after_sequence: u64,
808 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
809 let acked: std::collections::HashSet<u64> = {
810 let process_id = process_id.to_string();
811 self.conn
812 .call(move |conn| {
813 Ok(
814 (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
815 let mut stmt = conn
816 .prepare(
817 "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
818 )
819 .map_err(process_sqlite_error)?;
820 let rows = stmt
821 .query_map(params![process_id], |row| row.get::<_, i64>(0))
822 .map_err(process_sqlite_error)?;
823 let mut set = std::collections::HashSet::new();
824 for row in rows {
825 set.insert(row.map_err(process_sqlite_error)? as u64);
826 }
827 Ok(set)
828 })(),
829 )
830 })
831 .await
832 .map_err(process_sqlite_error)??
833 };
834 Ok(self
835 .events_after(process_id, after_sequence)
836 .await?
837 .into_iter()
838 .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
839 .collect())
840 }
841
842 async fn wait_event_after(
843 &self,
844 process_id: &str,
845 event_type: &str,
846 after_sequence: u64,
847 ) -> Result<ProcessEvent, lash_core::PluginError> {
848 loop {
849 if let Some(event) = self
850 .events_after(process_id, after_sequence)
851 .await?
852 .into_iter()
853 .find(|event| event.event_type == event_type)
854 {
855 return Ok(event);
856 }
857 tokio::select! {
858 _ = self.notify.notified() => {}
859 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
860 }
861 }
862 }
863
864 async fn await_process(
865 &self,
866 process_id: &str,
867 ) -> Result<ProcessAwaitOutput, lash_core::PluginError> {
868 loop {
869 let record = self.get_process(process_id).await.ok_or_else(|| {
870 lash_core::PluginError::Session(format!("unknown process `{process_id}`"))
871 })?;
872 if let Some(await_output) = record.status.await_output() {
873 return Ok(await_output.clone());
874 }
875 tokio::select! {
876 _ = self.notify.notified() => {}
877 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
878 }
879 }
880 }
881
882 async fn complete_process(
883 &self,
884 process_id: &str,
885 await_output: ProcessAwaitOutput,
886 ) -> Result<ProcessRecord, lash_core::PluginError> {
887 let event_type = match await_output.terminal_state() {
888 lash_core::ProcessTerminalState::Completed => "process.completed",
889 lash_core::ProcessTerminalState::Failed => "process.failed",
890 lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
891 };
892 self.append_event(
893 process_id,
894 ProcessEventAppendRequest::new(
895 event_type,
896 serde_json::json!({ "await_output": await_output }),
897 )
898 .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
899 )
900 .await?;
901 self.get_process(process_id).await.ok_or_else(|| {
902 lash_core::PluginError::Session(format!(
903 "unknown process `{process_id}` after terminal event"
904 ))
905 })
906 }
907
908 async fn set_process_wait(
909 &self,
910 process_id: &str,
911 wait: lash_core::WaitState,
912 ) -> Result<ProcessRecord, lash_core::PluginError> {
913 let process_id = process_id.to_string();
914 self.conn
915 .write_flow(move |tx| {
916 Ok(tx_outcome((|| {
917 let mut record =
918 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
919 lash_core::PluginError::Session(format!(
920 "unknown process `{process_id}`"
921 ))
922 })?;
923 if record.is_terminal() {
924 return Err(lash_core::PluginError::Session(format!(
925 "terminal process `{process_id}` cannot enter a wait state"
926 )));
927 }
928 record.wait = Some(wait);
929 record.updated_at_ms = current_epoch_ms();
930 Self::save_process_conn(tx, &record)?;
931 Ok(record)
932 })()))
933 })
934 .await
935 .map_err(process_sqlite_error)?
936 }
937
938 async fn clear_process_wait(
939 &self,
940 process_id: &str,
941 ) -> Result<ProcessRecord, lash_core::PluginError> {
942 let process_id = process_id.to_string();
943 self.conn
944 .write_flow(move |tx| {
945 Ok(tx_outcome((|| {
946 let mut record =
947 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
948 lash_core::PluginError::Session(format!(
949 "unknown process `{process_id}`"
950 ))
951 })?;
952 record.wait = None;
953 record.updated_at_ms = current_epoch_ms();
954 Self::save_process_conn(tx, &record)?;
955 Ok(record)
956 })()))
957 })
958 .await
959 .map_err(process_sqlite_error)?
960 }
961
962 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
963 let process_id = process_id.to_string();
964 self.conn
965 .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
966 .await
967 .ok()
968 .flatten()
969 }
970
971 async fn list_processes(
972 &self,
973 filter: &lash_core::ProcessListFilter,
974 ) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
975 let filter = filter.clone();
976 self.conn
977 .call(move |conn| {
978 Ok((|| {
979 let mut stmt = conn
980 .prepare(
981 "SELECT record_json FROM processes
982 ORDER BY process_id ASC",
983 )
984 .map_err(process_sqlite_error)?;
985 let rows = stmt
986 .query_map([], |row| row.get::<_, String>(0))
987 .map_err(process_sqlite_error)?;
988 let mut records = Vec::new();
989 for row in rows {
990 let record: ProcessRecord =
991 serde_json::from_str(&row.map_err(process_sqlite_error)?)
992 .map_err(process_decode_error)?;
993 if filter.matches_record(&record) {
994 records.push(record);
995 }
996 }
997 Ok(records)
998 })())
999 })
1000 .await
1001 .map_err(process_sqlite_error)?
1002 }
1003
1004 async fn ack_wake(
1005 &self,
1006 process_id: &str,
1007 sequence: u64,
1008 ) -> Result<(), lash_core::PluginError> {
1009 let process_id = process_id.to_string();
1010 self.conn
1011 .call(move |conn| {
1012 Ok((|| {
1013 if Self::load_process_conn(conn, &process_id)?.is_none() {
1014 return Err(lash_core::PluginError::Session(format!(
1015 "unknown process `{process_id}`"
1016 )));
1017 }
1018 conn.execute(
1019 "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
1020 params![process_id, sequence as i64],
1021 )
1022 .map_err(process_sqlite_error)?;
1023 Ok(())
1024 })())
1025 })
1026 .await
1027 .map_err(process_sqlite_error)?
1028 }
1029
1030 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1031 self.conn
1032 .call(move |conn| {
1033 Ok((|| {
1034 let mut stmt = conn
1035 .prepare(
1036 "SELECT record_json FROM processes
1037 WHERE status = 'running'
1038 ORDER BY process_id ASC",
1039 )
1040 .map_err(process_sqlite_error)?;
1041 let rows = stmt
1042 .query_map([], |row| row.get::<_, String>(0))
1043 .map_err(process_sqlite_error)?;
1044 let mut records = Vec::new();
1045 for row in rows {
1046 let record: ProcessRecord =
1047 serde_json::from_str(&row.map_err(process_sqlite_error)?)
1048 .map_err(process_decode_error)?;
1049 records.push(record);
1050 }
1051 Ok(records)
1052 })())
1053 })
1054 .await
1055 .map_err(process_sqlite_error)?
1056 }
1057
1058 async fn claim_process_lease(
1059 &self,
1060 process_id: &str,
1061 owner_id: &str,
1062 lease_ttl_ms: u64,
1063 ) -> Result<ProcessLease, lash_core::PluginError> {
1064 let process_id = process_id.to_string();
1065 let owner_id = owner_id.to_string();
1066 self.conn
1067 .write_flow(move |tx| {
1068 Ok(tx_outcome((|| {
1069 if Self::load_process_conn(tx, &process_id)?.is_none() {
1070 return Err(lash_core::PluginError::Session(format!(
1071 "unknown process `{process_id}`"
1072 )));
1073 }
1074 let now = current_epoch_ms();
1075 let current = Self::load_process_lease_conn(tx, &process_id)?;
1076 if let Some(current) = current.as_ref()
1077 && current.expires_at_epoch_ms > now
1078 && current.owner_id != owner_id
1079 {
1080 return Err(process_lease_conflict(&process_id, current));
1081 }
1082 let fencing_token: u64 = tx
1087 .query_row(
1088 "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1089 params![process_id],
1090 |row| row.get::<_, i64>(0),
1091 )
1092 .optional()
1093 .map_err(process_sqlite_error)?
1094 .unwrap_or(0) as u64
1095 + 1;
1096 let lease = ProcessLease {
1097 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1098 process_id: process_id.clone(),
1099 owner_id: owner_id.clone(),
1100 lease_token: format!(
1101 "{:x}",
1102 Sha256::digest(
1103 format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes()
1104 )
1105 ),
1106 fencing_token,
1107 claimed_at_epoch_ms: now,
1108 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1109 };
1110 tx.execute(
1111 "INSERT INTO process_leases (
1112 process_id, lease_owner_id, lease_token, lease_fencing_token,
1113 lease_claimed_at_ms, lease_expires_at_ms
1114 )
1115 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1116 ON CONFLICT(process_id) DO UPDATE SET
1117 lease_owner_id = excluded.lease_owner_id,
1118 lease_token = excluded.lease_token,
1119 lease_fencing_token = excluded.lease_fencing_token,
1120 lease_claimed_at_ms = excluded.lease_claimed_at_ms,
1121 lease_expires_at_ms = excluded.lease_expires_at_ms",
1122 params![
1123 lease.process_id.as_str(),
1124 lease.owner_id.as_str(),
1125 lease.lease_token.as_str(),
1126 lease.fencing_token as i64,
1127 lease.claimed_at_epoch_ms as i64,
1128 lease.expires_at_epoch_ms as i64,
1129 ],
1130 )
1131 .map_err(process_sqlite_error)?;
1132 Ok(lease)
1133 })()))
1134 })
1135 .await
1136 .map_err(process_sqlite_error)?
1137 }
1138
1139 async fn renew_process_lease(
1140 &self,
1141 lease: &ProcessLease,
1142 lease_ttl_ms: u64,
1143 ) -> Result<ProcessLease, lash_core::PluginError> {
1144 let lease = lease.clone();
1145 self.conn
1146 .write_flow(move |tx| {
1147 Ok(tx_outcome((|| {
1148 let now = current_epoch_ms();
1149 let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
1150 if !guard_lease(current.as_ref(), &lease.lease_token, now) {
1151 return Err(process_lease_expired(&lease.process_id));
1152 }
1153 let renewed = ProcessLease {
1154 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1155 ..lease.clone()
1156 };
1157 tx.execute(
1158 "UPDATE process_leases
1159 SET lease_expires_at_ms = ?2
1160 WHERE process_id = ?1 AND lease_token = ?3",
1161 params![
1162 renewed.process_id.as_str(),
1163 renewed.expires_at_epoch_ms as i64,
1164 renewed.lease_token.as_str(),
1165 ],
1166 )
1167 .map_err(process_sqlite_error)?;
1168 Ok(renewed)
1169 })()))
1170 })
1171 .await
1172 .map_err(process_sqlite_error)?
1173 }
1174
1175 async fn complete_process_lease(
1176 &self,
1177 completion: &ProcessLeaseCompletion,
1178 ) -> Result<(), lash_core::PluginError> {
1179 let process_id = completion.process_id.clone();
1180 let lease_token = completion.lease_token.clone();
1181 self.conn
1182 .call(move |conn| {
1183 conn.execute(
1184 "UPDATE process_leases
1185 SET lease_owner_id = NULL,
1186 lease_token = NULL,
1187 lease_claimed_at_ms = 0,
1188 lease_expires_at_ms = 0
1189 WHERE process_id = ?1 AND lease_token = ?2",
1190 params![process_id, lease_token],
1191 )
1192 })
1193 .await
1194 .map_err(process_sqlite_error)?;
1195 Ok(())
1196 }
1197}
1198
1199fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> lash_core::PluginError {
1202 lash_core::PluginError::Session(format!(
1203 "process `{process_id}` is already leased by `{}` until {}",
1204 current.owner_id, current.expires_at_epoch_ms
1205 ))
1206}
1207
1208fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1210 lash_core::PluginError::Session(format!(
1211 "process lease for `{process_id}` is missing or expired"
1212 ))
1213}