1use super::*;
28
29fn process_status_label(record: &ProcessRecord) -> &'static str {
30 record.status.label()
31}
32
33impl SqliteProcessRegistry {
34 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
35 let conn = SqliteConnection::open(path).await?;
36 ensure_process_schema(&conn).await?;
37 apply_pragmas(&conn, StoreBacking::File).await?;
38 Ok(Self {
39 conn,
40 notify: tokio::sync::Notify::new(),
41 })
42 }
43
44 pub async fn memory() -> tokio_rusqlite::Result<Self> {
45 let conn = SqliteConnection::open_in_memory().await?;
46 ensure_process_schema(&conn).await?;
47 apply_pragmas(&conn, StoreBacking::Memory).await?;
48 Ok(Self {
49 conn,
50 notify: tokio::sync::Notify::new(),
51 })
52 }
53
54 fn load_process_conn(
55 conn: &Connection,
56 process_id: &str,
57 ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
58 let json: Option<String> = conn
59 .query_row(
60 "SELECT record_json FROM processes WHERE process_id = ?1",
61 params![process_id],
62 |row| row.get(0),
63 )
64 .optional()
65 .map_err(process_sqlite_error)?;
66 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
67 .transpose()
68 }
69
70 fn save_process_conn(
71 conn: &Connection,
72 record: &ProcessRecord,
73 ) -> Result<(), lash_core::PluginError> {
74 conn.execute(
75 "UPDATE processes
76 SET updated_at_ms = ?2, status = ?3, record_json = ?4
77 WHERE process_id = ?1",
78 params![
79 record.id.as_str(),
80 record.updated_at_ms as i64,
81 process_status_label(record),
82 process_encode_json(record)?
83 ],
84 )
85 .map_err(process_sqlite_error)?;
86 Ok(())
87 }
88
89 fn load_event_by_key_conn(
90 conn: &Connection,
91 process_id: &str,
92 replay_key: &str,
93 ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
94 let row: Option<(String, String)> = conn
95 .query_row(
96 "SELECT payload_hash, event_json
97 FROM process_events
98 WHERE process_id = ?1 AND idempotency_key = ?2",
99 params![process_id, replay_key],
100 |row| Ok((row.get(0)?, row.get(1)?)),
101 )
102 .optional()
103 .map_err(process_sqlite_error)?;
104 row.map(|(hash, json)| {
105 serde_json::from_str(&json)
106 .map(|event| (hash, event))
107 .map_err(process_decode_error)
108 })
109 .transpose()
110 }
111
112 fn load_process_lease_conn(
113 conn: &Connection,
114 process_id: &str,
115 ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
116 conn.query_row(
117 "SELECT lease_owner_id, lease_token, lease_fencing_token,
118 lease_claimed_at_ms, lease_expires_at_ms
119 FROM process_leases
120 WHERE process_id = ?1",
121 params![process_id],
122 |row| {
123 let owner_id: Option<String> = row.get(0)?;
124 let lease_token: Option<String> = row.get(1)?;
125 let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
126 return Ok(None);
127 };
128 Ok(Some(ProcessLease {
129 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
130 process_id: process_id.to_string(),
131 owner_id,
132 lease_token,
133 fencing_token: row.get::<_, i64>(2)? as u64,
134 claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
135 expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
136 }))
137 },
138 )
139 .optional()
140 .map(|lease| lease.flatten())
141 .map_err(process_sqlite_error)
142 }
143
144 fn list_grants_for_scope_conn(
145 conn: &Connection,
146 session_scope: &SessionScope,
147 live_only: bool,
148 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
149 let session_scope_id = session_scope.id();
150 let status_clause = if live_only {
151 "AND p.status = 'running'"
152 } else {
153 ""
154 };
155 let mut stmt = conn
156 .prepare(&format!(
157 "SELECT g.process_id, g.descriptor_json, p.record_json
158 FROM process_handle_grants g
159 JOIN processes p ON p.process_id = g.process_id
160 WHERE g.scope_id = ?1 {status_clause}
161 ORDER BY g.process_id ASC"
162 ))
163 .map_err(process_sqlite_error)?;
164 let rows = stmt
165 .query_map(params![session_scope_id.as_str()], |row| {
166 Ok((
167 row.get::<_, String>(0)?,
168 row.get::<_, String>(1)?,
169 row.get::<_, String>(2)?,
170 ))
171 })
172 .map_err(process_sqlite_error)?;
173 let mut entries = Vec::new();
174 for row in rows {
175 let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
176 let descriptor: ProcessHandleDescriptor =
177 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
178 let record: ProcessRecord =
179 serde_json::from_str(&record_json).map_err(process_decode_error)?;
180 entries.push((
181 ProcessHandleGrant {
182 session_id: session_scope.session_id.clone(),
183 process_id,
184 descriptor,
185 },
186 record,
187 ));
188 }
189 Ok(entries)
190 }
191}
192
193fn tx_outcome<T>(
198 result: Result<T, lash_core::PluginError>,
199) -> TxOutcome<Result<T, lash_core::PluginError>> {
200 match result {
201 Ok(value) => TxOutcome::Commit(Ok(value)),
202 Err(err) => TxOutcome::Rollback(Err(err)),
203 }
204}
205
206#[async_trait::async_trait]
207impl ProcessRegistry for SqliteProcessRegistry {
208 fn durability_tier(&self) -> DurabilityTier {
209 DurabilityTier::Durable
210 }
211
212 async fn register_process(
213 &self,
214 registration: ProcessRegistration,
215 ) -> Result<ProcessRecord, lash_core::PluginError> {
216 let (registration, registration_hash) = prepare_process_registration(registration)?;
217 let record = self
218 .conn
219 .write_flow(move |tx| {
220 Ok(tx_outcome((|| {
221 if let Some(existing) = Self::load_process_conn(tx, ®istration.id)? {
222 if existing.registration_hash == registration_hash {
223 return Ok(existing);
224 }
225 return Err(lash_core::PluginError::Session(format!(
226 "process `{}` registration hash conflict: existing {}, new {}",
227 registration.id, existing.registration_hash, registration_hash
228 )));
229 }
230 let now = current_epoch_ms();
231 let record = ProcessRecord::from_prepared_registration(
232 registration,
233 registration_hash,
234 now,
235 );
236 let originator_scope_id = record.originator_scope_id();
237 tx.execute(
238 "INSERT INTO processes (
239 process_id, registration_hash, owner_scope_id, host_profile_id,
240 created_at_ms, updated_at_ms, status, record_json
241 )
242 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
243 params![
244 record.id.as_str(),
245 record.registration_hash.as_str(),
246 originator_scope_id.as_str(),
247 record.host_profile_id(),
248 record.created_at_ms as i64,
249 record.updated_at_ms as i64,
250 process_status_label(&record),
251 process_encode_json(&record)?,
252 ],
253 )
254 .map_err(process_sqlite_error)?;
255 Ok(record)
256 })()))
257 })
258 .await
259 .map_err(process_sqlite_error)??;
260 self.notify.notify_waiters();
261 Ok(record)
262 }
263
264 async fn set_external_ref(
265 &self,
266 process_id: &str,
267 external_ref: ProcessExternalRef,
268 ) -> Result<ProcessRecord, lash_core::PluginError> {
269 let process_id = process_id.to_string();
270 let record = self
271 .conn
272 .write_flow(move |tx| {
273 Ok(tx_outcome((|| {
274 let mut record =
275 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
276 lash_core::PluginError::Session(format!(
277 "unknown process `{process_id}`"
278 ))
279 })?;
280 record.external_ref = Some(external_ref);
281 record.updated_at_ms = current_epoch_ms();
282 Self::save_process_conn(tx, &record)?;
283 Ok(record)
284 })()))
285 })
286 .await
287 .map_err(process_sqlite_error)??;
288 self.notify.notify_waiters();
289 Ok(record)
290 }
291
292 async fn grant_handle(
293 &self,
294 session_scope: &SessionScope,
295 process_id: &str,
296 descriptor: ProcessHandleDescriptor,
297 ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
298 let session_scope = session_scope.clone();
299 let process_id = process_id.to_string();
300 self.conn
301 .write_flow(move |tx| {
302 Ok(tx_outcome((|| {
303 let session_scope_id = session_scope.id();
304 if Self::load_process_conn(tx, &process_id)?.is_none() {
305 return Err(lash_core::PluginError::Session(format!(
306 "unknown process `{process_id}`"
307 )));
308 }
309 tx.execute(
310 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
311 VALUES (?1, ?2, ?3, ?4)
312 ON CONFLICT(scope_id, process_id) DO UPDATE SET
313 session_id = excluded.session_id,
314 descriptor_json = excluded.descriptor_json",
315 params![
316 session_scope.session_id.as_str(),
317 session_scope_id.as_str(),
318 process_id.as_str(),
319 process_encode_json(&descriptor)?
320 ],
321 )
322 .map_err(process_sqlite_error)?;
323 Ok(ProcessHandleGrant {
324 session_id: session_scope.session_id.clone(),
325 process_id: process_id.clone(),
326 descriptor,
327 })
328 })()))
329 })
330 .await
331 .map_err(process_sqlite_error)?
332 }
333
334 async fn revoke_handle(
335 &self,
336 session_scope: &SessionScope,
337 process_id: &str,
338 ) -> Result<(), lash_core::PluginError> {
339 let session_scope_id = session_scope.id().as_str().to_string();
340 let process_id = process_id.to_string();
341 self.conn
342 .call(move |conn| {
343 conn.execute(
344 "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
345 params![session_scope_id, process_id],
346 )
347 })
348 .await
349 .map_err(process_sqlite_error)?;
350 Ok(())
351 }
352
353 async fn transfer_handle_grants(
354 &self,
355 from_scope: &SessionScope,
356 to_scope: &SessionScope,
357 process_ids: &[String],
358 ) -> Result<(), lash_core::PluginError> {
359 let from_scope = from_scope.clone();
360 let to_scope = to_scope.clone();
361 let process_ids = process_ids.to_vec();
362 self.conn
363 .write_flow(move |tx| {
364 Ok(tx_outcome((|| {
365 let from_scope_id = from_scope.id();
366 let to_scope_id = to_scope.id();
367 for process_id in &process_ids {
368 let descriptor_json: Option<String> = tx
369 .query_row(
370 "SELECT descriptor_json
371 FROM process_handle_grants
372 WHERE scope_id = ?1 AND process_id = ?2",
373 params![from_scope_id.as_str(), process_id.as_str()],
374 |row| row.get(0),
375 )
376 .optional()
377 .map_err(process_sqlite_error)?;
378 let Some(descriptor_json) = descriptor_json else {
379 return Err(lash_core::PluginError::Session(format!(
380 "process handle `{process_id}` is not granted to session `{}`",
381 from_scope.session_id
382 )));
383 };
384 tx.execute(
385 "DELETE FROM process_handle_grants
386 WHERE scope_id = ?1 AND process_id = ?2",
387 params![from_scope_id.as_str(), process_id.as_str()],
388 )
389 .map_err(process_sqlite_error)?;
390 tx.execute(
391 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
392 VALUES (?1, ?2, ?3, ?4)
393 ON CONFLICT(scope_id, process_id) DO UPDATE SET
394 session_id = excluded.session_id,
395 descriptor_json = excluded.descriptor_json",
396 params![
397 to_scope.session_id.as_str(),
398 to_scope_id.as_str(),
399 process_id.as_str(),
400 descriptor_json
401 ],
402 )
403 .map_err(process_sqlite_error)?;
404 }
405 Ok(())
406 })()))
407 })
408 .await
409 .map_err(process_sqlite_error)?
410 }
411
412 async fn list_handle_grants(
413 &self,
414 session_scope: &SessionScope,
415 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
416 let session_scope = session_scope.clone();
417 self.conn
418 .call(move |conn| {
419 Ok(Self::list_grants_for_scope_conn(
420 conn,
421 &session_scope,
422 false,
423 ))
424 })
425 .await
426 .map_err(process_sqlite_error)?
427 }
428
429 async fn list_live_handle_grants(
430 &self,
431 session_scope: &SessionScope,
432 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
433 let session_scope = session_scope.clone();
434 self.conn
435 .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &session_scope, true)))
436 .await
437 .map_err(process_sqlite_error)?
438 }
439
440 async fn has_handle_grant(
441 &self,
442 session_scope: &SessionScope,
443 process_id: &str,
444 ) -> Result<bool, lash_core::PluginError> {
445 let session_scope_id = session_scope.id().as_str().to_string();
446 let process_id = process_id.to_string();
447 self.conn
448 .call(move |conn| {
449 let exists = conn
450 .query_row(
451 "SELECT 1
452 FROM process_handle_grants g
453 JOIN processes p ON p.process_id = g.process_id
454 WHERE g.scope_id = ?1 AND g.process_id = ?2
455 LIMIT 1",
456 params![session_scope_id, process_id],
457 |_| Ok(()),
458 )
459 .optional()?
460 .is_some();
461 Ok(exists)
462 })
463 .await
464 .map_err(process_sqlite_error)
465 }
466
467 async fn handle_grants_for_process(
468 &self,
469 process_id: &str,
470 ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
471 let process_id = process_id.to_string();
472 self.conn
473 .call(move |conn| {
474 Ok((|| {
475 if Self::load_process_conn(conn, &process_id)?.is_none() {
476 return Err(lash_core::PluginError::Session(format!(
477 "unknown process `{process_id}`"
478 )));
479 }
480 let mut stmt = conn
481 .prepare(
482 "SELECT session_id, descriptor_json
483 FROM process_handle_grants
484 WHERE process_id = ?1
485 ORDER BY session_id ASC, scope_id ASC",
486 )
487 .map_err(process_sqlite_error)?;
488 let rows = stmt
489 .query_map(params![process_id], |row| {
490 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
491 })
492 .map_err(process_sqlite_error)?;
493 let mut grants = Vec::new();
494 for row in rows {
495 let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
496 let descriptor: ProcessHandleDescriptor =
497 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
498 grants.push(ProcessHandleGrant {
499 session_id,
500 process_id: process_id.clone(),
501 descriptor,
502 });
503 }
504 Ok(grants)
505 })())
506 })
507 .await
508 .map_err(process_sqlite_error)?
509 }
510
511 async fn delete_session_process_state(
512 &self,
513 session_id: &str,
514 ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
515 let session_id_owned = session_id.to_string();
516 let (
517 revoked_handle_count,
518 deleted_wake_count,
519 mut orphaned_process_ids,
520 mut preserved_process_ids,
521 ) = self
522 .conn
523 .write_flow(move |tx| {
524 Ok(tx_outcome((|| {
525 let session_id = session_id_owned;
526 let removed = {
527 let mut stmt = tx
528 .prepare(
529 "SELECT g.process_id, p.record_json
530 FROM process_handle_grants g
531 JOIN processes p ON p.process_id = g.process_id
532 WHERE g.session_id = ?1
533 ORDER BY g.process_id ASC",
534 )
535 .map_err(process_sqlite_error)?;
536 let rows = stmt
537 .query_map(params![session_id], |row| {
538 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
539 })
540 .map_err(process_sqlite_error)?;
541 let mut removed = Vec::new();
542 for row in rows {
543 let (process_id, record_json) = row.map_err(process_sqlite_error)?;
544 let record: ProcessRecord =
545 serde_json::from_str(&record_json).map_err(process_decode_error)?;
546 removed.push((process_id, record));
547 }
548 removed
549 };
550
551 let deleted_wake_count = 0;
556 let revoked_handle_count = tx
557 .execute(
558 "DELETE FROM process_handle_grants WHERE session_id = ?1",
559 params![session_id],
560 )
561 .map_err(process_sqlite_error)?;
562 let mut orphaned_process_ids = Vec::new();
563 let mut preserved_process_ids = Vec::new();
564 for (process_id, record) in removed {
565 if record.is_terminal() {
566 continue;
567 }
568 let remaining_grants: i64 = tx
569 .query_row(
570 "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
571 params![process_id],
572 |row| row.get(0),
573 )
574 .map_err(process_sqlite_error)?;
575 if remaining_grants == 0 {
576 orphaned_process_ids.push(process_id);
577 } else {
578 preserved_process_ids.push(process_id);
579 }
580 }
581 Ok((
582 revoked_handle_count,
583 deleted_wake_count,
584 orphaned_process_ids,
585 preserved_process_ids,
586 ))
587 })()))
588 })
589 .await
590 .map_err(process_sqlite_error)??;
591 orphaned_process_ids.sort();
592 orphaned_process_ids.dedup();
593 preserved_process_ids.sort();
594 preserved_process_ids.dedup();
595 Ok(lash_core::ProcessSessionDeleteReport {
596 session_id: session_id.to_string(),
597 revoked_handle_count,
598 deleted_wake_count,
599 orphaned_process_ids,
600 preserved_process_ids,
601 })
602 }
603
604 async fn append_event(
605 &self,
606 process_id: &str,
607 request: ProcessEventAppendRequest,
608 ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
609 let process_id = process_id.to_string();
610 let (result, appended) = self
611 .conn
612 .write_flow(move |tx| {
613 Ok(tx_outcome((|| {
614 let mut record =
615 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
616 lash_core::PluginError::Session(format!(
617 "unknown process `{process_id}`"
618 ))
619 })?;
620 let replay_lookup = if let Some(replay_key) =
621 request.replay.as_ref().map(|replay| replay.key.as_str())
622 {
623 Self::load_event_by_key_conn(tx, &process_id, replay_key)?
624 } else {
625 None
626 };
627 let sequence = tx
628 .query_row(
629 "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
630 params![process_id],
631 |row| row.get::<_, i64>(0),
632 )
633 .map_err(process_sqlite_error)? as u64;
634 let occurred_at_ms = current_epoch_ms();
635 let prepared = prepare_process_event_append(
636 &record,
637 request,
638 sequence,
639 replay_lookup,
640 occurred_at_ms,
641 )?;
642 if prepared.replayed {
643 return Ok((
644 ProcessEventAppendResult {
645 event: prepared.event,
646 wake_delivery: prepared.wake_delivery,
647 },
648 false,
649 ));
650 }
651 let event = prepared.event;
652 tx.execute(
653 "INSERT INTO process_events (
654 process_id, sequence, event_type, payload_hash, idempotency_key,
655 occurred_at_ms, event_json
656 )
657 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
658 params![
659 process_id,
660 sequence as i64,
661 event.event_type.as_str(),
662 prepared.payload_hash.as_str(),
663 event.invocation.replay_key(),
664 prepared.occurred_at_ms as i64,
665 process_encode_json(&event)?,
666 ],
667 )
668 .map_err(process_sqlite_error)?;
669 if let Some(status) = prepared.status_update.clone() {
670 record.status = status;
671 if record.status.is_terminal() {
672 record.wait = None;
673 }
674 }
675 record.updated_at_ms = prepared.occurred_at_ms;
676 Self::save_process_conn(tx, &record)?;
677 Ok((
678 ProcessEventAppendResult {
679 event,
680 wake_delivery: prepared.wake_delivery,
681 },
682 true,
683 ))
684 })()))
685 })
686 .await
687 .map_err(process_sqlite_error)??;
688 if appended {
689 self.notify.notify_waiters();
690 }
691 Ok(result)
692 }
693
694 async fn events_after(
695 &self,
696 process_id: &str,
697 after_sequence: u64,
698 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
699 let process_id = process_id.to_string();
700 self.conn
701 .call(move |conn| {
702 Ok((|| {
703 if Self::load_process_conn(conn, &process_id)?.is_none() {
704 return Err(lash_core::PluginError::Session(format!(
705 "unknown process `{process_id}`"
706 )));
707 }
708 let mut stmt = conn
709 .prepare(
710 "SELECT event_json FROM process_events
711 WHERE process_id = ?1 AND sequence > ?2
712 ORDER BY sequence ASC",
713 )
714 .map_err(process_sqlite_error)?;
715 let rows = stmt
716 .query_map(params![process_id, after_sequence as i64], |row| {
717 row.get::<_, String>(0)
718 })
719 .map_err(process_sqlite_error)?;
720 let mut events = Vec::new();
721 for row in rows {
722 events.push(
723 serde_json::from_str(&row.map_err(process_sqlite_error)?)
724 .map_err(process_decode_error)?,
725 );
726 }
727 Ok(events)
728 })())
729 })
730 .await
731 .map_err(process_sqlite_error)?
732 }
733
734 async fn wake_events_after(
735 &self,
736 process_id: &str,
737 after_sequence: u64,
738 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
739 let acked: std::collections::HashSet<u64> = {
740 let process_id = process_id.to_string();
741 self.conn
742 .call(move |conn| {
743 Ok(
744 (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
745 let mut stmt = conn
746 .prepare(
747 "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
748 )
749 .map_err(process_sqlite_error)?;
750 let rows = stmt
751 .query_map(params![process_id], |row| row.get::<_, i64>(0))
752 .map_err(process_sqlite_error)?;
753 let mut set = std::collections::HashSet::new();
754 for row in rows {
755 set.insert(row.map_err(process_sqlite_error)? as u64);
756 }
757 Ok(set)
758 })(),
759 )
760 })
761 .await
762 .map_err(process_sqlite_error)??
763 };
764 Ok(self
765 .events_after(process_id, after_sequence)
766 .await?
767 .into_iter()
768 .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
769 .collect())
770 }
771
772 async fn wait_event_after(
773 &self,
774 process_id: &str,
775 event_type: &str,
776 after_sequence: u64,
777 ) -> Result<ProcessEvent, lash_core::PluginError> {
778 loop {
779 if let Some(event) = self
780 .events_after(process_id, after_sequence)
781 .await?
782 .into_iter()
783 .find(|event| event.event_type == event_type)
784 {
785 return Ok(event);
786 }
787 tokio::select! {
788 _ = self.notify.notified() => {}
789 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
790 }
791 }
792 }
793
794 async fn await_process(
795 &self,
796 process_id: &str,
797 ) -> Result<ProcessAwaitOutput, lash_core::PluginError> {
798 loop {
799 let record = self.get_process(process_id).await.ok_or_else(|| {
800 lash_core::PluginError::Session(format!("unknown process `{process_id}`"))
801 })?;
802 if let Some(await_output) = record.status.await_output() {
803 return Ok(await_output.clone());
804 }
805 tokio::select! {
806 _ = self.notify.notified() => {}
807 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
808 }
809 }
810 }
811
812 async fn complete_process(
813 &self,
814 process_id: &str,
815 await_output: ProcessAwaitOutput,
816 ) -> Result<ProcessRecord, lash_core::PluginError> {
817 let event_type = match await_output.terminal_state() {
818 lash_core::ProcessTerminalState::Completed => "process.completed",
819 lash_core::ProcessTerminalState::Failed => "process.failed",
820 lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
821 };
822 self.append_event(
823 process_id,
824 ProcessEventAppendRequest::new(
825 event_type,
826 serde_json::json!({ "await_output": await_output }),
827 )
828 .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
829 )
830 .await?;
831 self.get_process(process_id).await.ok_or_else(|| {
832 lash_core::PluginError::Session(format!(
833 "unknown process `{process_id}` after terminal event"
834 ))
835 })
836 }
837
838 async fn set_process_wait(
839 &self,
840 process_id: &str,
841 wait: lash_core::WaitState,
842 ) -> Result<ProcessRecord, lash_core::PluginError> {
843 let process_id = process_id.to_string();
844 self.conn
845 .write_flow(move |tx| {
846 Ok(tx_outcome((|| {
847 let mut record =
848 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
849 lash_core::PluginError::Session(format!(
850 "unknown process `{process_id}`"
851 ))
852 })?;
853 if record.is_terminal() {
854 return Err(lash_core::PluginError::Session(format!(
855 "terminal process `{process_id}` cannot enter a wait state"
856 )));
857 }
858 record.wait = Some(wait);
859 record.updated_at_ms = current_epoch_ms();
860 Self::save_process_conn(tx, &record)?;
861 Ok(record)
862 })()))
863 })
864 .await
865 .map_err(process_sqlite_error)?
866 }
867
868 async fn clear_process_wait(
869 &self,
870 process_id: &str,
871 ) -> Result<ProcessRecord, lash_core::PluginError> {
872 let process_id = process_id.to_string();
873 self.conn
874 .write_flow(move |tx| {
875 Ok(tx_outcome((|| {
876 let mut record =
877 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
878 lash_core::PluginError::Session(format!(
879 "unknown process `{process_id}`"
880 ))
881 })?;
882 record.wait = None;
883 record.updated_at_ms = current_epoch_ms();
884 Self::save_process_conn(tx, &record)?;
885 Ok(record)
886 })()))
887 })
888 .await
889 .map_err(process_sqlite_error)?
890 }
891
892 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
893 let process_id = process_id.to_string();
894 self.conn
895 .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
896 .await
897 .ok()
898 .flatten()
899 }
900
901 async fn list_processes(
902 &self,
903 filter: &lash_core::ProcessListFilter,
904 ) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
905 let filter = filter.clone();
906 self.conn
907 .call(move |conn| {
908 Ok((|| {
909 let mut stmt = conn
910 .prepare(
911 "SELECT record_json FROM processes
912 ORDER BY process_id ASC",
913 )
914 .map_err(process_sqlite_error)?;
915 let rows = stmt
916 .query_map([], |row| row.get::<_, String>(0))
917 .map_err(process_sqlite_error)?;
918 let mut records = Vec::new();
919 for row in rows {
920 let record: ProcessRecord =
921 serde_json::from_str(&row.map_err(process_sqlite_error)?)
922 .map_err(process_decode_error)?;
923 if filter.matches_record(&record) {
924 records.push(record);
925 }
926 }
927 Ok(records)
928 })())
929 })
930 .await
931 .map_err(process_sqlite_error)?
932 }
933
934 async fn ack_wake(
935 &self,
936 process_id: &str,
937 sequence: u64,
938 ) -> Result<(), lash_core::PluginError> {
939 let process_id = process_id.to_string();
940 self.conn
941 .call(move |conn| {
942 Ok((|| {
943 if Self::load_process_conn(conn, &process_id)?.is_none() {
944 return Err(lash_core::PluginError::Session(format!(
945 "unknown process `{process_id}`"
946 )));
947 }
948 conn.execute(
949 "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
950 params![process_id, sequence as i64],
951 )
952 .map_err(process_sqlite_error)?;
953 Ok(())
954 })())
955 })
956 .await
957 .map_err(process_sqlite_error)?
958 }
959
960 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
961 self.conn
962 .call(move |conn| {
963 Ok((|| {
964 let mut stmt = conn
965 .prepare(
966 "SELECT record_json FROM processes
967 WHERE status = 'running'
968 ORDER BY process_id ASC",
969 )
970 .map_err(process_sqlite_error)?;
971 let rows = stmt
972 .query_map([], |row| row.get::<_, String>(0))
973 .map_err(process_sqlite_error)?;
974 let mut records = Vec::new();
975 for row in rows {
976 let record: ProcessRecord =
977 serde_json::from_str(&row.map_err(process_sqlite_error)?)
978 .map_err(process_decode_error)?;
979 records.push(record);
980 }
981 Ok(records)
982 })())
983 })
984 .await
985 .map_err(process_sqlite_error)?
986 }
987
988 async fn claim_process_lease(
989 &self,
990 process_id: &str,
991 owner_id: &str,
992 lease_ttl_ms: u64,
993 ) -> Result<ProcessLease, lash_core::PluginError> {
994 let process_id = process_id.to_string();
995 let owner_id = owner_id.to_string();
996 self.conn
997 .write_flow(move |tx| {
998 Ok(tx_outcome((|| {
999 if Self::load_process_conn(tx, &process_id)?.is_none() {
1000 return Err(lash_core::PluginError::Session(format!(
1001 "unknown process `{process_id}`"
1002 )));
1003 }
1004 let now = current_epoch_ms();
1005 let current = Self::load_process_lease_conn(tx, &process_id)?;
1006 if let Some(current) = current.as_ref()
1007 && current.expires_at_epoch_ms > now
1008 && current.owner_id != owner_id
1009 {
1010 return Err(process_lease_conflict(&process_id, current));
1011 }
1012 let fencing_token: u64 = tx
1017 .query_row(
1018 "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1019 params![process_id],
1020 |row| row.get::<_, i64>(0),
1021 )
1022 .optional()
1023 .map_err(process_sqlite_error)?
1024 .unwrap_or(0) as u64
1025 + 1;
1026 let lease = ProcessLease {
1027 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1028 process_id: process_id.clone(),
1029 owner_id: owner_id.clone(),
1030 lease_token: format!(
1031 "{:x}",
1032 Sha256::digest(
1033 format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes()
1034 )
1035 ),
1036 fencing_token,
1037 claimed_at_epoch_ms: now,
1038 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1039 };
1040 tx.execute(
1041 "INSERT INTO process_leases (
1042 process_id, lease_owner_id, lease_token, lease_fencing_token,
1043 lease_claimed_at_ms, lease_expires_at_ms
1044 )
1045 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1046 ON CONFLICT(process_id) DO UPDATE SET
1047 lease_owner_id = excluded.lease_owner_id,
1048 lease_token = excluded.lease_token,
1049 lease_fencing_token = excluded.lease_fencing_token,
1050 lease_claimed_at_ms = excluded.lease_claimed_at_ms,
1051 lease_expires_at_ms = excluded.lease_expires_at_ms",
1052 params![
1053 lease.process_id.as_str(),
1054 lease.owner_id.as_str(),
1055 lease.lease_token.as_str(),
1056 lease.fencing_token as i64,
1057 lease.claimed_at_epoch_ms as i64,
1058 lease.expires_at_epoch_ms as i64,
1059 ],
1060 )
1061 .map_err(process_sqlite_error)?;
1062 Ok(lease)
1063 })()))
1064 })
1065 .await
1066 .map_err(process_sqlite_error)?
1067 }
1068
1069 async fn renew_process_lease(
1070 &self,
1071 lease: &ProcessLease,
1072 lease_ttl_ms: u64,
1073 ) -> Result<ProcessLease, lash_core::PluginError> {
1074 let lease = lease.clone();
1075 self.conn
1076 .write_flow(move |tx| {
1077 Ok(tx_outcome((|| {
1078 let now = current_epoch_ms();
1079 let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
1080 if !guard_lease(current.as_ref(), &lease.lease_token, now) {
1081 return Err(process_lease_expired(&lease.process_id));
1082 }
1083 let renewed = ProcessLease {
1084 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1085 ..lease.clone()
1086 };
1087 tx.execute(
1088 "UPDATE process_leases
1089 SET lease_expires_at_ms = ?2
1090 WHERE process_id = ?1 AND lease_token = ?3",
1091 params![
1092 renewed.process_id.as_str(),
1093 renewed.expires_at_epoch_ms as i64,
1094 renewed.lease_token.as_str(),
1095 ],
1096 )
1097 .map_err(process_sqlite_error)?;
1098 Ok(renewed)
1099 })()))
1100 })
1101 .await
1102 .map_err(process_sqlite_error)?
1103 }
1104
1105 async fn complete_process_lease(
1106 &self,
1107 completion: &ProcessLeaseCompletion,
1108 ) -> Result<(), lash_core::PluginError> {
1109 let process_id = completion.process_id.clone();
1110 let lease_token = completion.lease_token.clone();
1111 self.conn
1112 .call(move |conn| {
1113 conn.execute(
1114 "UPDATE process_leases
1115 SET lease_owner_id = NULL,
1116 lease_token = NULL,
1117 lease_claimed_at_ms = 0,
1118 lease_expires_at_ms = 0
1119 WHERE process_id = ?1 AND lease_token = ?2",
1120 params![process_id, lease_token],
1121 )
1122 })
1123 .await
1124 .map_err(process_sqlite_error)?;
1125 Ok(())
1126 }
1127}
1128
1129fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> lash_core::PluginError {
1132 lash_core::PluginError::Session(format!(
1133 "process `{process_id}` is already leased by `{}` until {}",
1134 current.owner_id, current.expires_at_epoch_ms
1135 ))
1136}
1137
1138fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1140 lash_core::PluginError::Session(format!(
1141 "process lease for `{process_id}` is missing or expired"
1142 ))
1143}