1use super::*;
28
29fn process_status_label(record: &ProcessRecord) -> &'static str {
30 record.status.label()
31}
32
33impl SqliteProcessRegistry {
34 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
35 let conn = SqliteConnection::open(path).await?;
36 ensure_process_schema(&conn).await?;
37 apply_pragmas(&conn, StoreBacking::File).await?;
38 Ok(Self {
39 conn,
40 notify: tokio::sync::Notify::new(),
41 })
42 }
43
44 pub async fn memory() -> tokio_rusqlite::Result<Self> {
45 let conn = SqliteConnection::open_in_memory().await?;
46 ensure_process_schema(&conn).await?;
47 apply_pragmas(&conn, StoreBacking::Memory).await?;
48 Ok(Self {
49 conn,
50 notify: tokio::sync::Notify::new(),
51 })
52 }
53
54 fn load_process_conn(
55 conn: &Connection,
56 process_id: &str,
57 ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
58 let json: Option<String> = conn
59 .query_row(
60 "SELECT record_json FROM processes WHERE process_id = ?1",
61 params![process_id],
62 |row| row.get(0),
63 )
64 .optional()
65 .map_err(process_sqlite_error)?;
66 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
67 .transpose()
68 }
69
70 fn save_process_conn(
71 conn: &Connection,
72 record: &ProcessRecord,
73 ) -> Result<(), lash_core::PluginError> {
74 conn.execute(
75 "UPDATE processes
76 SET updated_at_ms = ?2, status = ?3, record_json = ?4
77 WHERE process_id = ?1",
78 params![
79 record.id.as_str(),
80 record.updated_at_ms as i64,
81 process_status_label(record),
82 process_encode_json(record)?
83 ],
84 )
85 .map_err(process_sqlite_error)?;
86 Ok(())
87 }
88
89 fn load_event_by_key_conn(
90 conn: &Connection,
91 process_id: &str,
92 replay_key: &str,
93 ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
94 let row: Option<(String, String)> = conn
95 .query_row(
96 "SELECT payload_hash, event_json
97 FROM process_events
98 WHERE process_id = ?1 AND idempotency_key = ?2",
99 params![process_id, replay_key],
100 |row| Ok((row.get(0)?, row.get(1)?)),
101 )
102 .optional()
103 .map_err(process_sqlite_error)?;
104 row.map(|(hash, json)| {
105 serde_json::from_str(&json)
106 .map(|event| (hash, event))
107 .map_err(process_decode_error)
108 })
109 .transpose()
110 }
111
112 fn load_process_lease_conn(
113 conn: &Connection,
114 process_id: &str,
115 ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
116 conn.query_row(
117 "SELECT lease_owner_id, lease_token, lease_fencing_token,
118 lease_claimed_at_ms, lease_expires_at_ms
119 FROM process_leases
120 WHERE process_id = ?1",
121 params![process_id],
122 |row| {
123 let owner_id: Option<String> = row.get(0)?;
124 let lease_token: Option<String> = row.get(1)?;
125 let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
126 return Ok(None);
127 };
128 Ok(Some(ProcessLease {
129 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
130 process_id: process_id.to_string(),
131 owner_id,
132 lease_token,
133 fencing_token: row.get::<_, i64>(2)? as u64,
134 claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
135 expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
136 }))
137 },
138 )
139 .optional()
140 .map(|lease| lease.flatten())
141 .map_err(process_sqlite_error)
142 }
143
144 fn list_grants_for_scope_conn(
145 conn: &Connection,
146 owner_scope: &ProcessScope,
147 live_only: bool,
148 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
149 let owner_scope_id = owner_scope.id();
150 let status_clause = if live_only {
151 "AND p.status = 'running'"
152 } else {
153 ""
154 };
155 let mut stmt = conn
156 .prepare(&format!(
157 "SELECT g.process_id, g.descriptor_json, p.record_json
158 FROM process_handle_grants g
159 JOIN processes p ON p.process_id = g.process_id
160 WHERE g.scope_id = ?1 {status_clause}
161 ORDER BY g.process_id ASC"
162 ))
163 .map_err(process_sqlite_error)?;
164 let rows = stmt
165 .query_map(params![owner_scope_id.as_str()], |row| {
166 Ok((
167 row.get::<_, String>(0)?,
168 row.get::<_, String>(1)?,
169 row.get::<_, String>(2)?,
170 ))
171 })
172 .map_err(process_sqlite_error)?;
173 let mut entries = Vec::new();
174 for row in rows {
175 let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
176 let descriptor: ProcessHandleDescriptor =
177 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
178 let record: ProcessRecord =
179 serde_json::from_str(&record_json).map_err(process_decode_error)?;
180 entries.push((
181 ProcessHandleGrant {
182 session_id: owner_scope.session_id.clone(),
183 process_id,
184 descriptor,
185 },
186 record,
187 ));
188 }
189 Ok(entries)
190 }
191}
192
193fn tx_outcome<T>(
198 result: Result<T, lash_core::PluginError>,
199) -> TxOutcome<Result<T, lash_core::PluginError>> {
200 match result {
201 Ok(value) => TxOutcome::Commit(Ok(value)),
202 Err(err) => TxOutcome::Rollback(Err(err)),
203 }
204}
205
206#[async_trait::async_trait]
207impl ProcessRegistry for SqliteProcessRegistry {
208 fn durability_tier(&self) -> DurabilityTier {
209 DurabilityTier::Durable
210 }
211
212 async fn register_process(
213 &self,
214 registration: ProcessRegistration,
215 ) -> Result<ProcessRecord, lash_core::PluginError> {
216 let (registration, registration_hash) = prepare_process_registration(registration)?;
217 self.conn
218 .write_flow(move |tx| {
219 Ok(tx_outcome((|| {
220 if let Some(existing) = Self::load_process_conn(tx, ®istration.id)? {
221 if existing.registration_hash == registration_hash {
222 return Ok(existing);
223 }
224 return Err(lash_core::PluginError::Session(format!(
225 "process `{}` registration hash conflict: existing {}, new {}",
226 registration.id, existing.registration_hash, registration_hash
227 )));
228 }
229 let now = current_epoch_ms();
230 let record = ProcessRecord::from_prepared_registration(
231 registration,
232 registration_hash,
233 now,
234 );
235 let owner_scope_id = record.owner_scope_id();
236 tx.execute(
237 "INSERT INTO processes (
238 process_id, registration_hash, owner_scope_id, host_profile_id,
239 created_at_ms, updated_at_ms, status, record_json
240 )
241 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
242 params![
243 record.id.as_str(),
244 record.registration_hash.as_str(),
245 owner_scope_id.as_str(),
246 record.host_profile_id(),
247 record.created_at_ms as i64,
248 record.updated_at_ms as i64,
249 process_status_label(&record),
250 process_encode_json(&record)?,
251 ],
252 )
253 .map_err(process_sqlite_error)?;
254 Ok(record)
255 })()))
256 })
257 .await
258 .map_err(process_sqlite_error)?
259 }
260
261 async fn set_external_ref(
262 &self,
263 process_id: &str,
264 external_ref: ProcessExternalRef,
265 ) -> Result<ProcessRecord, lash_core::PluginError> {
266 let process_id = process_id.to_string();
267 self.conn
268 .write_flow(move |tx| {
269 Ok(tx_outcome((|| {
270 let mut record =
271 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
272 lash_core::PluginError::Session(format!(
273 "unknown process `{process_id}`"
274 ))
275 })?;
276 record.external_ref = Some(external_ref);
277 record.updated_at_ms = current_epoch_ms();
278 Self::save_process_conn(tx, &record)?;
279 Ok(record)
280 })()))
281 })
282 .await
283 .map_err(process_sqlite_error)?
284 }
285
286 async fn grant_handle(
287 &self,
288 owner_scope: &ProcessScope,
289 process_id: &str,
290 descriptor: ProcessHandleDescriptor,
291 ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
292 let owner_scope = owner_scope.clone();
293 let process_id = process_id.to_string();
294 self.conn
295 .write_flow(move |tx| {
296 Ok(tx_outcome((|| {
297 let owner_scope_id = owner_scope.id();
298 if Self::load_process_conn(tx, &process_id)?.is_none() {
299 return Err(lash_core::PluginError::Session(format!(
300 "unknown process `{process_id}`"
301 )));
302 }
303 tx.execute(
304 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
305 VALUES (?1, ?2, ?3, ?4)
306 ON CONFLICT(scope_id, process_id) DO UPDATE SET
307 session_id = excluded.session_id,
308 descriptor_json = excluded.descriptor_json",
309 params![
310 owner_scope.session_id.as_str(),
311 owner_scope_id.as_str(),
312 process_id.as_str(),
313 process_encode_json(&descriptor)?
314 ],
315 )
316 .map_err(process_sqlite_error)?;
317 Ok(ProcessHandleGrant {
318 session_id: owner_scope.session_id.clone(),
319 process_id: process_id.clone(),
320 descriptor,
321 })
322 })()))
323 })
324 .await
325 .map_err(process_sqlite_error)?
326 }
327
328 async fn revoke_handle(
329 &self,
330 owner_scope: &ProcessScope,
331 process_id: &str,
332 ) -> Result<(), lash_core::PluginError> {
333 let owner_scope_id = owner_scope.id().as_str().to_string();
334 let process_id = process_id.to_string();
335 self.conn
336 .call(move |conn| {
337 conn.execute(
338 "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
339 params![owner_scope_id, process_id],
340 )
341 })
342 .await
343 .map_err(process_sqlite_error)?;
344 Ok(())
345 }
346
347 async fn transfer_handle_grants(
348 &self,
349 from_scope: &ProcessScope,
350 to_scope: &ProcessScope,
351 process_ids: &[String],
352 ) -> Result<(), lash_core::PluginError> {
353 let from_scope = from_scope.clone();
354 let to_scope = to_scope.clone();
355 let process_ids = process_ids.to_vec();
356 self.conn
357 .write_flow(move |tx| {
358 Ok(tx_outcome((|| {
359 let from_scope_id = from_scope.id();
360 let to_scope_id = to_scope.id();
361 for process_id in &process_ids {
362 let descriptor_json: Option<String> = tx
363 .query_row(
364 "SELECT descriptor_json
365 FROM process_handle_grants
366 WHERE scope_id = ?1 AND process_id = ?2",
367 params![from_scope_id.as_str(), process_id.as_str()],
368 |row| row.get(0),
369 )
370 .optional()
371 .map_err(process_sqlite_error)?;
372 let Some(descriptor_json) = descriptor_json else {
373 return Err(lash_core::PluginError::Session(format!(
374 "process handle `{process_id}` is not granted to session `{}`",
375 from_scope.session_id
376 )));
377 };
378 tx.execute(
379 "DELETE FROM process_handle_grants
380 WHERE scope_id = ?1 AND process_id = ?2",
381 params![from_scope_id.as_str(), process_id.as_str()],
382 )
383 .map_err(process_sqlite_error)?;
384 tx.execute(
385 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
386 VALUES (?1, ?2, ?3, ?4)
387 ON CONFLICT(scope_id, process_id) DO UPDATE SET
388 session_id = excluded.session_id,
389 descriptor_json = excluded.descriptor_json",
390 params![
391 to_scope.session_id.as_str(),
392 to_scope_id.as_str(),
393 process_id.as_str(),
394 descriptor_json
395 ],
396 )
397 .map_err(process_sqlite_error)?;
398 }
399 Ok(())
400 })()))
401 })
402 .await
403 .map_err(process_sqlite_error)?
404 }
405
406 async fn list_handle_grants(
407 &self,
408 owner_scope: &ProcessScope,
409 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
410 let owner_scope = owner_scope.clone();
411 self.conn
412 .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &owner_scope, false)))
413 .await
414 .map_err(process_sqlite_error)?
415 }
416
417 async fn list_live_handle_grants(
418 &self,
419 owner_scope: &ProcessScope,
420 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
421 let owner_scope = owner_scope.clone();
422 self.conn
423 .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &owner_scope, true)))
424 .await
425 .map_err(process_sqlite_error)?
426 }
427
428 async fn has_handle_grant(
429 &self,
430 owner_scope: &ProcessScope,
431 process_id: &str,
432 ) -> Result<bool, lash_core::PluginError> {
433 let owner_scope_id = owner_scope.id().as_str().to_string();
434 let process_id = process_id.to_string();
435 self.conn
436 .call(move |conn| {
437 let exists = conn
438 .query_row(
439 "SELECT 1
440 FROM process_handle_grants g
441 JOIN processes p ON p.process_id = g.process_id
442 WHERE g.scope_id = ?1 AND g.process_id = ?2
443 LIMIT 1",
444 params![owner_scope_id, process_id],
445 |_| Ok(()),
446 )
447 .optional()?
448 .is_some();
449 Ok(exists)
450 })
451 .await
452 .map_err(process_sqlite_error)
453 }
454
455 async fn handle_grants_for_process(
456 &self,
457 process_id: &str,
458 ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
459 let process_id = process_id.to_string();
460 self.conn
461 .call(move |conn| {
462 Ok((|| {
463 if Self::load_process_conn(conn, &process_id)?.is_none() {
464 return Err(lash_core::PluginError::Session(format!(
465 "unknown process `{process_id}`"
466 )));
467 }
468 let mut stmt = conn
469 .prepare(
470 "SELECT session_id, descriptor_json
471 FROM process_handle_grants
472 WHERE process_id = ?1
473 ORDER BY session_id ASC, scope_id ASC",
474 )
475 .map_err(process_sqlite_error)?;
476 let rows = stmt
477 .query_map(params![process_id], |row| {
478 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
479 })
480 .map_err(process_sqlite_error)?;
481 let mut grants = Vec::new();
482 for row in rows {
483 let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
484 let descriptor: ProcessHandleDescriptor =
485 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
486 grants.push(ProcessHandleGrant {
487 session_id,
488 process_id: process_id.clone(),
489 descriptor,
490 });
491 }
492 Ok(grants)
493 })())
494 })
495 .await
496 .map_err(process_sqlite_error)?
497 }
498
499 async fn delete_session_process_state(
500 &self,
501 session_id: &str,
502 ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
503 let session_id_owned = session_id.to_string();
504 let (revoked_handle_count, mut cancel_process_ids, mut preserved_process_ids) = self
505 .conn
506 .write_flow(move |tx| {
507 Ok(tx_outcome((|| {
508 let session_id = session_id_owned;
509 let removed = {
510 let mut stmt = tx
511 .prepare(
512 "SELECT g.process_id, p.record_json
513 FROM process_handle_grants g
514 JOIN processes p ON p.process_id = g.process_id
515 WHERE g.session_id = ?1
516 ORDER BY g.process_id ASC",
517 )
518 .map_err(process_sqlite_error)?;
519 let rows = stmt
520 .query_map(params![session_id], |row| {
521 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
522 })
523 .map_err(process_sqlite_error)?;
524 let mut removed = Vec::new();
525 for row in rows {
526 let (process_id, record_json) = row.map_err(process_sqlite_error)?;
527 let record: ProcessRecord =
528 serde_json::from_str(&record_json).map_err(process_decode_error)?;
529 removed.push((process_id, record));
530 }
531 removed
532 };
533
534 let revoked_handle_count = tx
535 .execute(
536 "DELETE FROM process_handle_grants WHERE session_id = ?1",
537 params![session_id],
538 )
539 .map_err(process_sqlite_error)?;
540 let mut cancel_process_ids = Vec::new();
541 let mut preserved_process_ids = Vec::new();
542 for (process_id, record) in removed {
543 if record.is_terminal() {
544 continue;
545 }
546 let remaining_grants: i64 = tx
547 .query_row(
548 "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
549 params![process_id],
550 |row| row.get(0),
551 )
552 .map_err(process_sqlite_error)?;
553 if remaining_grants == 0 {
554 cancel_process_ids.push(process_id);
555 } else {
556 preserved_process_ids.push(process_id);
557 }
558 }
559 Ok((
560 revoked_handle_count,
561 cancel_process_ids,
562 preserved_process_ids,
563 ))
564 })()))
565 })
566 .await
567 .map_err(process_sqlite_error)??;
568 cancel_process_ids.sort();
569 cancel_process_ids.dedup();
570 preserved_process_ids.sort();
571 preserved_process_ids.dedup();
572 Ok(lash_core::ProcessSessionDeleteReport {
573 session_id: session_id.to_string(),
574 revoked_handle_count,
575 deleted_wake_count: 0,
576 cancel_process_ids,
577 preserved_process_ids,
578 })
579 }
580
581 async fn append_event(
582 &self,
583 process_id: &str,
584 request: ProcessEventAppendRequest,
585 ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
586 let process_id = process_id.to_string();
587 let (result, appended) = self
588 .conn
589 .write_flow(move |tx| {
590 Ok(tx_outcome((|| {
591 let mut record =
592 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
593 lash_core::PluginError::Session(format!(
594 "unknown process `{process_id}`"
595 ))
596 })?;
597 let replay_lookup = if let Some(replay_key) =
598 request.replay.as_ref().map(|replay| replay.key.as_str())
599 {
600 Self::load_event_by_key_conn(tx, &process_id, replay_key)?
601 } else {
602 None
603 };
604 let sequence = tx
605 .query_row(
606 "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
607 params![process_id],
608 |row| row.get::<_, i64>(0),
609 )
610 .map_err(process_sqlite_error)? as u64;
611 let occurred_at_ms = current_epoch_ms();
612 let prepared = prepare_process_event_append(
613 &record,
614 request,
615 sequence,
616 replay_lookup,
617 occurred_at_ms,
618 )?;
619 if prepared.replayed {
620 return Ok((
621 ProcessEventAppendResult {
622 event: prepared.event,
623 wake_delivery: prepared.wake_delivery,
624 },
625 false,
626 ));
627 }
628 let event = prepared.event;
629 tx.execute(
630 "INSERT INTO process_events (
631 process_id, sequence, event_type, payload_hash, idempotency_key,
632 occurred_at_ms, event_json
633 )
634 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
635 params![
636 process_id,
637 sequence as i64,
638 event.event_type.as_str(),
639 prepared.payload_hash.as_str(),
640 event.invocation.replay_key(),
641 prepared.occurred_at_ms as i64,
642 process_encode_json(&event)?,
643 ],
644 )
645 .map_err(process_sqlite_error)?;
646 if let Some(status) = prepared.status_update.clone() {
647 record.status = status;
648 }
649 record.updated_at_ms = prepared.occurred_at_ms;
650 Self::save_process_conn(tx, &record)?;
651 Ok((
652 ProcessEventAppendResult {
653 event,
654 wake_delivery: prepared.wake_delivery,
655 },
656 true,
657 ))
658 })()))
659 })
660 .await
661 .map_err(process_sqlite_error)??;
662 if appended {
663 self.notify.notify_waiters();
664 }
665 Ok(result)
666 }
667
668 async fn events_after(
669 &self,
670 process_id: &str,
671 after_sequence: u64,
672 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
673 let process_id = process_id.to_string();
674 self.conn
675 .call(move |conn| {
676 Ok((|| {
677 if Self::load_process_conn(conn, &process_id)?.is_none() {
678 return Err(lash_core::PluginError::Session(format!(
679 "unknown process `{process_id}`"
680 )));
681 }
682 let mut stmt = conn
683 .prepare(
684 "SELECT event_json FROM process_events
685 WHERE process_id = ?1 AND sequence > ?2
686 ORDER BY sequence ASC",
687 )
688 .map_err(process_sqlite_error)?;
689 let rows = stmt
690 .query_map(params![process_id, after_sequence as i64], |row| {
691 row.get::<_, String>(0)
692 })
693 .map_err(process_sqlite_error)?;
694 let mut events = Vec::new();
695 for row in rows {
696 events.push(
697 serde_json::from_str(&row.map_err(process_sqlite_error)?)
698 .map_err(process_decode_error)?,
699 );
700 }
701 Ok(events)
702 })())
703 })
704 .await
705 .map_err(process_sqlite_error)?
706 }
707
708 async fn wake_events_after(
709 &self,
710 process_id: &str,
711 after_sequence: u64,
712 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
713 let acked: std::collections::HashSet<u64> = {
714 let process_id = process_id.to_string();
715 self.conn
716 .call(move |conn| {
717 Ok(
718 (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
719 let mut stmt = conn
720 .prepare(
721 "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
722 )
723 .map_err(process_sqlite_error)?;
724 let rows = stmt
725 .query_map(params![process_id], |row| row.get::<_, i64>(0))
726 .map_err(process_sqlite_error)?;
727 let mut set = std::collections::HashSet::new();
728 for row in rows {
729 set.insert(row.map_err(process_sqlite_error)? as u64);
730 }
731 Ok(set)
732 })(),
733 )
734 })
735 .await
736 .map_err(process_sqlite_error)??
737 };
738 Ok(self
739 .events_after(process_id, after_sequence)
740 .await?
741 .into_iter()
742 .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
743 .collect())
744 }
745
746 async fn wait_event_after(
747 &self,
748 process_id: &str,
749 event_type: &str,
750 after_sequence: u64,
751 ) -> Result<ProcessEvent, lash_core::PluginError> {
752 loop {
753 if let Some(event) = self
754 .events_after(process_id, after_sequence)
755 .await?
756 .into_iter()
757 .find(|event| event.event_type == event_type)
758 {
759 return Ok(event);
760 }
761 tokio::select! {
762 _ = self.notify.notified() => {}
763 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
764 }
765 }
766 }
767
768 async fn await_process(
769 &self,
770 process_id: &str,
771 ) -> Result<ProcessAwaitOutput, lash_core::PluginError> {
772 loop {
773 let record = self.get_process(process_id).await.ok_or_else(|| {
774 lash_core::PluginError::Session(format!("unknown process `{process_id}`"))
775 })?;
776 if let Some(await_output) = record.status.await_output() {
777 return Ok(await_output.clone());
778 }
779 tokio::select! {
780 _ = self.notify.notified() => {}
781 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
782 }
783 }
784 }
785
786 async fn complete_process(
787 &self,
788 process_id: &str,
789 await_output: ProcessAwaitOutput,
790 ) -> Result<ProcessRecord, lash_core::PluginError> {
791 let event_type = match await_output.terminal_state() {
792 lash_core::ProcessTerminalState::Completed => "process.completed",
793 lash_core::ProcessTerminalState::Failed => "process.failed",
794 lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
795 };
796 self.append_event(
797 process_id,
798 ProcessEventAppendRequest::new(
799 event_type,
800 serde_json::json!({ "await_output": await_output }),
801 )
802 .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
803 )
804 .await?;
805 self.get_process(process_id).await.ok_or_else(|| {
806 lash_core::PluginError::Session(format!(
807 "unknown process `{process_id}` after terminal event"
808 ))
809 })
810 }
811
812 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
813 let process_id = process_id.to_string();
814 self.conn
815 .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
816 .await
817 .ok()
818 .flatten()
819 }
820
821 async fn ack_wake(
822 &self,
823 process_id: &str,
824 sequence: u64,
825 ) -> Result<(), lash_core::PluginError> {
826 let process_id = process_id.to_string();
827 self.conn
828 .call(move |conn| {
829 Ok((|| {
830 if Self::load_process_conn(conn, &process_id)?.is_none() {
831 return Err(lash_core::PluginError::Session(format!(
832 "unknown process `{process_id}`"
833 )));
834 }
835 conn.execute(
836 "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
837 params![process_id, sequence as i64],
838 )
839 .map_err(process_sqlite_error)?;
840 Ok(())
841 })())
842 })
843 .await
844 .map_err(process_sqlite_error)?
845 }
846
847 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
848 self.conn
849 .call(move |conn| {
850 Ok((|| {
851 let mut stmt = conn
852 .prepare(
853 "SELECT record_json FROM processes
854 WHERE status = 'running'
855 ORDER BY process_id ASC",
856 )
857 .map_err(process_sqlite_error)?;
858 let rows = stmt
859 .query_map([], |row| row.get::<_, String>(0))
860 .map_err(process_sqlite_error)?;
861 let mut records = Vec::new();
862 for row in rows {
863 let record: ProcessRecord =
864 serde_json::from_str(&row.map_err(process_sqlite_error)?)
865 .map_err(process_decode_error)?;
866 records.push(record);
867 }
868 Ok(records)
869 })())
870 })
871 .await
872 .map_err(process_sqlite_error)?
873 }
874
875 async fn claim_process_lease(
876 &self,
877 process_id: &str,
878 owner_id: &str,
879 lease_ttl_ms: u64,
880 ) -> Result<ProcessLease, lash_core::PluginError> {
881 let process_id = process_id.to_string();
882 let owner_id = owner_id.to_string();
883 self.conn
884 .write_flow(move |tx| {
885 Ok(tx_outcome((|| {
886 if Self::load_process_conn(tx, &process_id)?.is_none() {
887 return Err(lash_core::PluginError::Session(format!(
888 "unknown process `{process_id}`"
889 )));
890 }
891 let now = current_epoch_ms();
892 let current = Self::load_process_lease_conn(tx, &process_id)?;
893 if let Some(current) = current.as_ref()
894 && current.expires_at_epoch_ms > now
895 && current.owner_id != owner_id
896 {
897 return Err(process_lease_conflict(&process_id, current));
898 }
899 let fencing_token: u64 = tx
904 .query_row(
905 "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
906 params![process_id],
907 |row| row.get::<_, i64>(0),
908 )
909 .optional()
910 .map_err(process_sqlite_error)?
911 .unwrap_or(0) as u64
912 + 1;
913 let lease = ProcessLease {
914 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
915 process_id: process_id.clone(),
916 owner_id: owner_id.clone(),
917 lease_token: format!(
918 "{:x}",
919 Sha256::digest(
920 format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes()
921 )
922 ),
923 fencing_token,
924 claimed_at_epoch_ms: now,
925 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
926 };
927 tx.execute(
928 "INSERT INTO process_leases (
929 process_id, lease_owner_id, lease_token, lease_fencing_token,
930 lease_claimed_at_ms, lease_expires_at_ms
931 )
932 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
933 ON CONFLICT(process_id) DO UPDATE SET
934 lease_owner_id = excluded.lease_owner_id,
935 lease_token = excluded.lease_token,
936 lease_fencing_token = excluded.lease_fencing_token,
937 lease_claimed_at_ms = excluded.lease_claimed_at_ms,
938 lease_expires_at_ms = excluded.lease_expires_at_ms",
939 params![
940 lease.process_id.as_str(),
941 lease.owner_id.as_str(),
942 lease.lease_token.as_str(),
943 lease.fencing_token as i64,
944 lease.claimed_at_epoch_ms as i64,
945 lease.expires_at_epoch_ms as i64,
946 ],
947 )
948 .map_err(process_sqlite_error)?;
949 Ok(lease)
950 })()))
951 })
952 .await
953 .map_err(process_sqlite_error)?
954 }
955
956 async fn renew_process_lease(
957 &self,
958 lease: &ProcessLease,
959 lease_ttl_ms: u64,
960 ) -> Result<ProcessLease, lash_core::PluginError> {
961 let lease = lease.clone();
962 self.conn
963 .write_flow(move |tx| {
964 Ok(tx_outcome((|| {
965 let now = current_epoch_ms();
966 let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
967 if !guard_lease(current.as_ref(), &lease.lease_token, now) {
968 return Err(process_lease_expired(&lease.process_id));
969 }
970 let renewed = ProcessLease {
971 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
972 ..lease.clone()
973 };
974 tx.execute(
975 "UPDATE process_leases
976 SET lease_expires_at_ms = ?2
977 WHERE process_id = ?1 AND lease_token = ?3",
978 params![
979 renewed.process_id.as_str(),
980 renewed.expires_at_epoch_ms as i64,
981 renewed.lease_token.as_str(),
982 ],
983 )
984 .map_err(process_sqlite_error)?;
985 Ok(renewed)
986 })()))
987 })
988 .await
989 .map_err(process_sqlite_error)?
990 }
991
992 async fn complete_process_lease(
993 &self,
994 completion: &ProcessLeaseCompletion,
995 ) -> Result<(), lash_core::PluginError> {
996 let process_id = completion.process_id.clone();
997 let lease_token = completion.lease_token.clone();
998 self.conn
999 .call(move |conn| {
1000 conn.execute(
1001 "UPDATE process_leases
1002 SET lease_owner_id = NULL,
1003 lease_token = NULL,
1004 lease_claimed_at_ms = 0,
1005 lease_expires_at_ms = 0
1006 WHERE process_id = ?1 AND lease_token = ?2",
1007 params![process_id, lease_token],
1008 )
1009 })
1010 .await
1011 .map_err(process_sqlite_error)?;
1012 Ok(())
1013 }
1014}
1015
1016fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> lash_core::PluginError {
1019 lash_core::PluginError::Session(format!(
1020 "process `{process_id}` is already leased by `{}` until {}",
1021 current.owner_id, current.expires_at_epoch_ms
1022 ))
1023}
1024
1025fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1027 lash_core::PluginError::Session(format!(
1028 "process lease for `{process_id}` is missing or expired"
1029 ))
1030}