1use super::*;
28
29fn process_status_label(record: &ProcessRecord) -> &'static str {
30 record.status.label()
31}
32
33impl SqliteProcessRegistry {
34 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
35 let conn = SqliteConnection::open(path).await?;
36 ensure_process_schema(&conn).await?;
37 apply_pragmas(&conn, StoreBacking::File).await?;
38 Ok(Self {
39 conn,
40 notify: tokio::sync::Notify::new(),
41 })
42 }
43
44 pub async fn memory() -> tokio_rusqlite::Result<Self> {
45 let conn = SqliteConnection::open_in_memory().await?;
46 ensure_process_schema(&conn).await?;
47 apply_pragmas(&conn, StoreBacking::Memory).await?;
48 Ok(Self {
49 conn,
50 notify: tokio::sync::Notify::new(),
51 })
52 }
53
54 fn load_process_conn(
55 conn: &Connection,
56 process_id: &str,
57 ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
58 let json: Option<String> = conn
59 .query_row(
60 "SELECT record_json FROM processes WHERE process_id = ?1",
61 params![process_id],
62 |row| row.get(0),
63 )
64 .optional()
65 .map_err(process_sqlite_error)?;
66 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
67 .transpose()
68 }
69
70 fn save_process_conn(
71 conn: &Connection,
72 record: &ProcessRecord,
73 ) -> Result<(), lash_core::PluginError> {
74 conn.execute(
75 "UPDATE processes
76 SET updated_at_ms = ?2, status = ?3, record_json = ?4
77 WHERE process_id = ?1",
78 params![
79 record.id.as_str(),
80 record.updated_at_ms as i64,
81 process_status_label(record),
82 process_encode_json(record)?
83 ],
84 )
85 .map_err(process_sqlite_error)?;
86 Ok(())
87 }
88
89 fn load_event_by_key_conn(
90 conn: &Connection,
91 process_id: &str,
92 replay_key: &str,
93 ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
94 let row: Option<(String, String)> = conn
95 .query_row(
96 "SELECT payload_hash, event_json
97 FROM process_events
98 WHERE process_id = ?1 AND idempotency_key = ?2",
99 params![process_id, replay_key],
100 |row| Ok((row.get(0)?, row.get(1)?)),
101 )
102 .optional()
103 .map_err(process_sqlite_error)?;
104 row.map(|(hash, json)| {
105 serde_json::from_str(&json)
106 .map(|event| (hash, event))
107 .map_err(process_decode_error)
108 })
109 .transpose()
110 }
111
112 fn load_process_lease_conn(
113 conn: &Connection,
114 process_id: &str,
115 ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
116 conn.query_row(
117 "SELECT lease_owner_id, lease_token, lease_fencing_token,
118 lease_claimed_at_ms, lease_expires_at_ms
119 FROM process_leases
120 WHERE process_id = ?1",
121 params![process_id],
122 |row| {
123 let owner_id: Option<String> = row.get(0)?;
124 let lease_token: Option<String> = row.get(1)?;
125 let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
126 return Ok(None);
127 };
128 Ok(Some(ProcessLease {
129 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
130 process_id: process_id.to_string(),
131 owner_id,
132 lease_token,
133 fencing_token: row.get::<_, i64>(2)? as u64,
134 claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
135 expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
136 }))
137 },
138 )
139 .optional()
140 .map(|lease| lease.flatten())
141 .map_err(process_sqlite_error)
142 }
143
144 fn list_grants_for_scope_conn(
145 conn: &Connection,
146 session_scope: &SessionScope,
147 live_only: bool,
148 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
149 let session_scope_id = session_scope.id();
150 let status_clause = if live_only {
151 "AND p.status = 'running'"
152 } else {
153 ""
154 };
155 let mut stmt = conn
156 .prepare(&format!(
157 "SELECT g.process_id, g.descriptor_json, p.record_json
158 FROM process_handle_grants g
159 JOIN processes p ON p.process_id = g.process_id
160 WHERE g.scope_id = ?1 {status_clause}
161 ORDER BY g.process_id ASC"
162 ))
163 .map_err(process_sqlite_error)?;
164 let rows = stmt
165 .query_map(params![session_scope_id.as_str()], |row| {
166 Ok((
167 row.get::<_, String>(0)?,
168 row.get::<_, String>(1)?,
169 row.get::<_, String>(2)?,
170 ))
171 })
172 .map_err(process_sqlite_error)?;
173 let mut entries = Vec::new();
174 for row in rows {
175 let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
176 let descriptor: ProcessHandleDescriptor =
177 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
178 let record: ProcessRecord =
179 serde_json::from_str(&record_json).map_err(process_decode_error)?;
180 entries.push((
181 ProcessHandleGrant {
182 session_id: session_scope.session_id.clone(),
183 process_id,
184 descriptor,
185 },
186 record,
187 ));
188 }
189 Ok(entries)
190 }
191}
192
193fn tx_outcome<T>(
198 result: Result<T, lash_core::PluginError>,
199) -> TxOutcome<Result<T, lash_core::PluginError>> {
200 match result {
201 Ok(value) => TxOutcome::Commit(Ok(value)),
202 Err(err) => TxOutcome::Rollback(Err(err)),
203 }
204}
205
206#[async_trait::async_trait]
207impl ProcessRegistry for SqliteProcessRegistry {
208 fn durability_tier(&self) -> DurabilityTier {
209 DurabilityTier::Durable
210 }
211
212 async fn register_process(
213 &self,
214 registration: ProcessRegistration,
215 ) -> Result<ProcessRecord, lash_core::PluginError> {
216 let (registration, registration_hash) = prepare_process_registration(registration)?;
217 let record = self
218 .conn
219 .write_flow(move |tx| {
220 Ok(tx_outcome((|| {
221 if let Some(existing) = Self::load_process_conn(tx, ®istration.id)? {
222 if existing.registration_hash == registration_hash {
223 return Ok(existing);
224 }
225 return Err(lash_core::PluginError::Session(format!(
226 "process `{}` registration hash conflict: existing {}, new {}",
227 registration.id, existing.registration_hash, registration_hash
228 )));
229 }
230 let now = current_epoch_ms();
231 let record = ProcessRecord::from_prepared_registration(
232 registration,
233 registration_hash,
234 now,
235 );
236 let originator_scope_id = record.originator_scope_id();
237 tx.execute(
238 "INSERT INTO processes (
239 process_id, registration_hash, owner_scope_id, host_profile_id,
240 created_at_ms, updated_at_ms, status, record_json
241 )
242 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
243 params![
244 record.id.as_str(),
245 record.registration_hash.as_str(),
246 originator_scope_id.as_str(),
247 record.host_profile_id(),
248 record.created_at_ms as i64,
249 record.updated_at_ms as i64,
250 process_status_label(&record),
251 process_encode_json(&record)?,
252 ],
253 )
254 .map_err(process_sqlite_error)?;
255 Ok(record)
256 })()))
257 })
258 .await
259 .map_err(process_sqlite_error)??;
260 self.notify.notify_waiters();
261 Ok(record)
262 }
263
264 async fn set_external_ref(
265 &self,
266 process_id: &str,
267 external_ref: ProcessExternalRef,
268 ) -> Result<ProcessRecord, lash_core::PluginError> {
269 let process_id = process_id.to_string();
270 let record = self
271 .conn
272 .write_flow(move |tx| {
273 Ok(tx_outcome((|| {
274 let mut record =
275 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
276 lash_core::PluginError::Session(format!(
277 "unknown process `{process_id}`"
278 ))
279 })?;
280 record.external_ref = Some(external_ref);
281 record.updated_at_ms = current_epoch_ms();
282 Self::save_process_conn(tx, &record)?;
283 Ok(record)
284 })()))
285 })
286 .await
287 .map_err(process_sqlite_error)??;
288 self.notify.notify_waiters();
289 Ok(record)
290 }
291
292 async fn grant_handle(
293 &self,
294 session_scope: &SessionScope,
295 process_id: &str,
296 descriptor: ProcessHandleDescriptor,
297 ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
298 let session_scope = session_scope.clone();
299 let process_id = process_id.to_string();
300 self.conn
301 .write_flow(move |tx| {
302 Ok(tx_outcome((|| {
303 let session_scope_id = session_scope.id();
304 if Self::load_process_conn(tx, &process_id)?.is_none() {
305 return Err(lash_core::PluginError::Session(format!(
306 "unknown process `{process_id}`"
307 )));
308 }
309 tx.execute(
310 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
311 VALUES (?1, ?2, ?3, ?4)
312 ON CONFLICT(scope_id, process_id) DO UPDATE SET
313 session_id = excluded.session_id,
314 descriptor_json = excluded.descriptor_json",
315 params![
316 session_scope.session_id.as_str(),
317 session_scope_id.as_str(),
318 process_id.as_str(),
319 process_encode_json(&descriptor)?
320 ],
321 )
322 .map_err(process_sqlite_error)?;
323 Ok(ProcessHandleGrant {
324 session_id: session_scope.session_id.clone(),
325 process_id: process_id.clone(),
326 descriptor,
327 })
328 })()))
329 })
330 .await
331 .map_err(process_sqlite_error)?
332 }
333
334 async fn revoke_handle(
335 &self,
336 session_scope: &SessionScope,
337 process_id: &str,
338 ) -> Result<(), lash_core::PluginError> {
339 let session_scope_id = session_scope.id().as_str().to_string();
340 let process_id = process_id.to_string();
341 self.conn
342 .call(move |conn| {
343 conn.execute(
344 "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
345 params![session_scope_id, process_id],
346 )
347 })
348 .await
349 .map_err(process_sqlite_error)?;
350 Ok(())
351 }
352
353 async fn transfer_handle_grants(
354 &self,
355 from_scope: &SessionScope,
356 to_scope: &SessionScope,
357 process_ids: &[String],
358 ) -> Result<(), lash_core::PluginError> {
359 let from_scope = from_scope.clone();
360 let to_scope = to_scope.clone();
361 let process_ids = process_ids.to_vec();
362 self.conn
363 .write_flow(move |tx| {
364 Ok(tx_outcome((|| {
365 let from_scope_id = from_scope.id();
366 let to_scope_id = to_scope.id();
367 for process_id in &process_ids {
368 let descriptor_json: Option<String> = tx
369 .query_row(
370 "SELECT descriptor_json
371 FROM process_handle_grants
372 WHERE scope_id = ?1 AND process_id = ?2",
373 params![from_scope_id.as_str(), process_id.as_str()],
374 |row| row.get(0),
375 )
376 .optional()
377 .map_err(process_sqlite_error)?;
378 let Some(descriptor_json) = descriptor_json else {
379 return Err(lash_core::PluginError::Session(format!(
380 "process handle `{process_id}` is not granted to session `{}`",
381 from_scope.session_id
382 )));
383 };
384 tx.execute(
385 "DELETE FROM process_handle_grants
386 WHERE scope_id = ?1 AND process_id = ?2",
387 params![from_scope_id.as_str(), process_id.as_str()],
388 )
389 .map_err(process_sqlite_error)?;
390 tx.execute(
391 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
392 VALUES (?1, ?2, ?3, ?4)
393 ON CONFLICT(scope_id, process_id) DO UPDATE SET
394 session_id = excluded.session_id,
395 descriptor_json = excluded.descriptor_json",
396 params![
397 to_scope.session_id.as_str(),
398 to_scope_id.as_str(),
399 process_id.as_str(),
400 descriptor_json
401 ],
402 )
403 .map_err(process_sqlite_error)?;
404 }
405 Ok(())
406 })()))
407 })
408 .await
409 .map_err(process_sqlite_error)?
410 }
411
412 async fn list_handle_grants(
413 &self,
414 session_scope: &SessionScope,
415 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
416 let session_scope = session_scope.clone();
417 self.conn
418 .call(move |conn| {
419 Ok(Self::list_grants_for_scope_conn(
420 conn,
421 &session_scope,
422 false,
423 ))
424 })
425 .await
426 .map_err(process_sqlite_error)?
427 }
428
429 async fn list_live_handle_grants(
430 &self,
431 session_scope: &SessionScope,
432 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
433 let session_scope = session_scope.clone();
434 self.conn
435 .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &session_scope, true)))
436 .await
437 .map_err(process_sqlite_error)?
438 }
439
440 async fn has_handle_grant(
441 &self,
442 session_scope: &SessionScope,
443 process_id: &str,
444 ) -> Result<bool, lash_core::PluginError> {
445 let session_scope_id = session_scope.id().as_str().to_string();
446 let process_id = process_id.to_string();
447 self.conn
448 .call(move |conn| {
449 let exists = conn
450 .query_row(
451 "SELECT 1
452 FROM process_handle_grants g
453 JOIN processes p ON p.process_id = g.process_id
454 WHERE g.scope_id = ?1 AND g.process_id = ?2
455 LIMIT 1",
456 params![session_scope_id, process_id],
457 |_| Ok(()),
458 )
459 .optional()?
460 .is_some();
461 Ok(exists)
462 })
463 .await
464 .map_err(process_sqlite_error)
465 }
466
467 async fn handle_grants_for_process(
468 &self,
469 process_id: &str,
470 ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
471 let process_id = process_id.to_string();
472 self.conn
473 .call(move |conn| {
474 Ok((|| {
475 if Self::load_process_conn(conn, &process_id)?.is_none() {
476 return Err(lash_core::PluginError::Session(format!(
477 "unknown process `{process_id}`"
478 )));
479 }
480 let mut stmt = conn
481 .prepare(
482 "SELECT session_id, descriptor_json
483 FROM process_handle_grants
484 WHERE process_id = ?1
485 ORDER BY session_id ASC, scope_id ASC",
486 )
487 .map_err(process_sqlite_error)?;
488 let rows = stmt
489 .query_map(params![process_id], |row| {
490 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
491 })
492 .map_err(process_sqlite_error)?;
493 let mut grants = Vec::new();
494 for row in rows {
495 let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
496 let descriptor: ProcessHandleDescriptor =
497 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
498 grants.push(ProcessHandleGrant {
499 session_id,
500 process_id: process_id.clone(),
501 descriptor,
502 });
503 }
504 Ok(grants)
505 })())
506 })
507 .await
508 .map_err(process_sqlite_error)?
509 }
510
511 async fn delete_session_process_state(
512 &self,
513 session_id: &str,
514 ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
515 let session_id_owned = session_id.to_string();
516 let (
517 revoked_handle_count,
518 deleted_wake_count,
519 mut orphaned_process_ids,
520 mut preserved_process_ids,
521 ) = self
522 .conn
523 .write_flow(move |tx| {
524 Ok(tx_outcome((|| {
525 let session_id = session_id_owned;
526 let removed = {
527 let mut stmt = tx
528 .prepare(
529 "SELECT g.process_id, p.record_json
530 FROM process_handle_grants g
531 JOIN processes p ON p.process_id = g.process_id
532 WHERE g.session_id = ?1
533 ORDER BY g.process_id ASC",
534 )
535 .map_err(process_sqlite_error)?;
536 let rows = stmt
537 .query_map(params![session_id], |row| {
538 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
539 })
540 .map_err(process_sqlite_error)?;
541 let mut removed = Vec::new();
542 for row in rows {
543 let (process_id, record_json) = row.map_err(process_sqlite_error)?;
544 let record: ProcessRecord =
545 serde_json::from_str(&record_json).map_err(process_decode_error)?;
546 removed.push((process_id, record));
547 }
548 removed
549 };
550
551 let deleted_wake_count = 0;
556 let revoked_handle_count = tx
557 .execute(
558 "DELETE FROM process_handle_grants WHERE session_id = ?1",
559 params![session_id],
560 )
561 .map_err(process_sqlite_error)?;
562 let mut orphaned_process_ids = Vec::new();
563 let mut preserved_process_ids = Vec::new();
564 for (process_id, record) in removed {
565 if record.is_terminal() {
566 continue;
567 }
568 let remaining_grants: i64 = tx
569 .query_row(
570 "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
571 params![process_id],
572 |row| row.get(0),
573 )
574 .map_err(process_sqlite_error)?;
575 if remaining_grants == 0 {
576 orphaned_process_ids.push(process_id);
577 } else {
578 preserved_process_ids.push(process_id);
579 }
580 }
581 Ok((
582 revoked_handle_count,
583 deleted_wake_count,
584 orphaned_process_ids,
585 preserved_process_ids,
586 ))
587 })()))
588 })
589 .await
590 .map_err(process_sqlite_error)??;
591 orphaned_process_ids.sort();
592 orphaned_process_ids.dedup();
593 preserved_process_ids.sort();
594 preserved_process_ids.dedup();
595 Ok(lash_core::ProcessSessionDeleteReport {
596 session_id: session_id.to_string(),
597 revoked_handle_count,
598 deleted_wake_count,
599 orphaned_process_ids,
600 preserved_process_ids,
601 })
602 }
603
604 async fn append_event(
605 &self,
606 process_id: &str,
607 request: ProcessEventAppendRequest,
608 ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
609 let process_id = process_id.to_string();
610 let (result, appended) = self
611 .conn
612 .write_flow(move |tx| {
613 Ok(tx_outcome((|| {
614 let mut record =
615 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
616 lash_core::PluginError::Session(format!(
617 "unknown process `{process_id}`"
618 ))
619 })?;
620 let replay_lookup = if let Some(replay_key) =
621 request.replay.as_ref().map(|replay| replay.key.as_str())
622 {
623 Self::load_event_by_key_conn(tx, &process_id, replay_key)?
624 } else {
625 None
626 };
627 let sequence = tx
628 .query_row(
629 "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
630 params![process_id],
631 |row| row.get::<_, i64>(0),
632 )
633 .map_err(process_sqlite_error)? as u64;
634 let occurred_at_ms = current_epoch_ms();
635 let prepared = prepare_process_event_append(
636 &record,
637 request,
638 sequence,
639 replay_lookup,
640 occurred_at_ms,
641 )?;
642 if prepared.replayed {
643 return Ok((
644 ProcessEventAppendResult {
645 event: prepared.event,
646 wake_delivery: prepared.wake_delivery,
647 },
648 false,
649 ));
650 }
651 let event = prepared.event;
652 tx.execute(
653 "INSERT INTO process_events (
654 process_id, sequence, event_type, payload_hash, idempotency_key,
655 occurred_at_ms, event_json
656 )
657 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
658 params![
659 process_id,
660 sequence as i64,
661 event.event_type.as_str(),
662 prepared.payload_hash.as_str(),
663 event.invocation.replay_key(),
664 prepared.occurred_at_ms as i64,
665 process_encode_json(&event)?,
666 ],
667 )
668 .map_err(process_sqlite_error)?;
669 if let Some(status) = prepared.status_update.clone() {
670 record.status = status;
671 if record.status.is_terminal() {
672 record.wait = None;
673 }
674 }
675 record.updated_at_ms = prepared.occurred_at_ms;
676 Self::save_process_conn(tx, &record)?;
677 Ok((
678 ProcessEventAppendResult {
679 event,
680 wake_delivery: prepared.wake_delivery,
681 },
682 true,
683 ))
684 })()))
685 })
686 .await
687 .map_err(process_sqlite_error)??;
688 if appended {
689 self.notify.notify_waiters();
690 }
691 Ok(result)
692 }
693
694 async fn events_after(
695 &self,
696 process_id: &str,
697 after_sequence: u64,
698 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
699 let process_id = process_id.to_string();
700 self.conn
701 .call(move |conn| {
702 Ok((|| {
703 if Self::load_process_conn(conn, &process_id)?.is_none() {
704 return Err(lash_core::PluginError::Session(format!(
705 "unknown process `{process_id}`"
706 )));
707 }
708 let mut stmt = conn
709 .prepare(
710 "SELECT event_json FROM process_events
711 WHERE process_id = ?1 AND sequence > ?2
712 ORDER BY sequence ASC",
713 )
714 .map_err(process_sqlite_error)?;
715 let rows = stmt
716 .query_map(params![process_id, after_sequence as i64], |row| {
717 row.get::<_, String>(0)
718 })
719 .map_err(process_sqlite_error)?;
720 let mut events = Vec::new();
721 for row in rows {
722 events.push(
723 serde_json::from_str(&row.map_err(process_sqlite_error)?)
724 .map_err(process_decode_error)?,
725 );
726 }
727 Ok(events)
728 })())
729 })
730 .await
731 .map_err(process_sqlite_error)?
732 }
733
734 async fn count_events_through(
735 &self,
736 process_id: &str,
737 event_type: &str,
738 up_to_sequence: u64,
739 ) -> Result<u64, lash_core::PluginError> {
740 let process_id = process_id.to_string();
741 let event_type = event_type.to_string();
742 self.conn
743 .call(move |conn| {
744 Ok((|| {
745 if Self::load_process_conn(conn, &process_id)?.is_none() {
746 return Err(lash_core::PluginError::Session(format!(
747 "unknown process `{process_id}`"
748 )));
749 }
750 conn.query_row(
751 "SELECT COUNT(*) FROM process_events
752 WHERE process_id = ?1 AND event_type = ?2 AND sequence <= ?3",
753 params![process_id, event_type, up_to_sequence as i64],
754 |row| row.get::<_, i64>(0),
755 )
756 .map(|count| count as u64)
757 .map_err(process_sqlite_error)
758 })())
759 })
760 .await
761 .map_err(process_sqlite_error)?
762 }
763
764 async fn recent_events(
765 &self,
766 process_id: &str,
767 limit: usize,
768 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
769 let process_id = process_id.to_string();
770 self.conn
771 .call(move |conn| {
772 Ok((|| {
773 if Self::load_process_conn(conn, &process_id)?.is_none() {
774 return Err(lash_core::PluginError::Session(format!(
775 "unknown process `{process_id}`"
776 )));
777 }
778 let mut stmt = conn
779 .prepare(
780 "SELECT event_json FROM process_events
781 WHERE process_id = ?1
782 ORDER BY sequence DESC
783 LIMIT ?2",
784 )
785 .map_err(process_sqlite_error)?;
786 let rows = stmt
787 .query_map(params![process_id, limit as i64], |row| {
788 row.get::<_, String>(0)
789 })
790 .map_err(process_sqlite_error)?;
791 let mut events: Vec<ProcessEvent> = Vec::new();
792 for row in rows {
793 events.push(
794 serde_json::from_str(&row.map_err(process_sqlite_error)?)
795 .map_err(process_decode_error)?,
796 );
797 }
798 events.reverse();
799 Ok(events)
800 })())
801 })
802 .await
803 .map_err(process_sqlite_error)?
804 }
805
806 async fn wake_events_after(
807 &self,
808 process_id: &str,
809 after_sequence: u64,
810 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
811 let acked: std::collections::HashSet<u64> = {
812 let process_id = process_id.to_string();
813 self.conn
814 .call(move |conn| {
815 Ok(
816 (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
817 let mut stmt = conn
818 .prepare(
819 "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
820 )
821 .map_err(process_sqlite_error)?;
822 let rows = stmt
823 .query_map(params![process_id], |row| row.get::<_, i64>(0))
824 .map_err(process_sqlite_error)?;
825 let mut set = std::collections::HashSet::new();
826 for row in rows {
827 set.insert(row.map_err(process_sqlite_error)? as u64);
828 }
829 Ok(set)
830 })(),
831 )
832 })
833 .await
834 .map_err(process_sqlite_error)??
835 };
836 Ok(self
837 .events_after(process_id, after_sequence)
838 .await?
839 .into_iter()
840 .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
841 .collect())
842 }
843
844 async fn wait_event_after(
845 &self,
846 process_id: &str,
847 event_type: &str,
848 after_sequence: u64,
849 ) -> Result<ProcessEvent, lash_core::PluginError> {
850 loop {
851 if let Some(event) = self
852 .events_after(process_id, after_sequence)
853 .await?
854 .into_iter()
855 .find(|event| event.event_type == event_type)
856 {
857 return Ok(event);
858 }
859 tokio::select! {
860 _ = self.notify.notified() => {}
861 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
862 }
863 }
864 }
865
866 async fn await_process(
867 &self,
868 process_id: &str,
869 ) -> Result<ProcessAwaitOutput, lash_core::PluginError> {
870 loop {
871 let record = self.get_process(process_id).await.ok_or_else(|| {
872 lash_core::PluginError::Session(format!("unknown process `{process_id}`"))
873 })?;
874 if let Some(await_output) = record.status.await_output() {
875 return Ok(await_output.clone());
876 }
877 tokio::select! {
878 _ = self.notify.notified() => {}
879 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
880 }
881 }
882 }
883
884 async fn complete_process(
885 &self,
886 process_id: &str,
887 await_output: ProcessAwaitOutput,
888 ) -> Result<ProcessRecord, lash_core::PluginError> {
889 let event_type = match await_output.terminal_state() {
890 lash_core::ProcessTerminalState::Completed => "process.completed",
891 lash_core::ProcessTerminalState::Failed => "process.failed",
892 lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
893 };
894 self.append_event(
895 process_id,
896 ProcessEventAppendRequest::new(
897 event_type,
898 serde_json::json!({ "await_output": await_output }),
899 )
900 .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
901 )
902 .await?;
903 self.get_process(process_id).await.ok_or_else(|| {
904 lash_core::PluginError::Session(format!(
905 "unknown process `{process_id}` after terminal event"
906 ))
907 })
908 }
909
910 async fn set_process_wait(
911 &self,
912 process_id: &str,
913 wait: lash_core::WaitState,
914 ) -> Result<ProcessRecord, lash_core::PluginError> {
915 let process_id = process_id.to_string();
916 self.conn
917 .write_flow(move |tx| {
918 Ok(tx_outcome((|| {
919 let mut record =
920 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
921 lash_core::PluginError::Session(format!(
922 "unknown process `{process_id}`"
923 ))
924 })?;
925 if record.is_terminal() {
926 return Err(lash_core::PluginError::Session(format!(
927 "terminal process `{process_id}` cannot enter a wait state"
928 )));
929 }
930 record.wait = Some(wait);
931 record.updated_at_ms = current_epoch_ms();
932 Self::save_process_conn(tx, &record)?;
933 Ok(record)
934 })()))
935 })
936 .await
937 .map_err(process_sqlite_error)?
938 }
939
940 async fn clear_process_wait(
941 &self,
942 process_id: &str,
943 ) -> Result<ProcessRecord, lash_core::PluginError> {
944 let process_id = process_id.to_string();
945 self.conn
946 .write_flow(move |tx| {
947 Ok(tx_outcome((|| {
948 let mut record =
949 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
950 lash_core::PluginError::Session(format!(
951 "unknown process `{process_id}`"
952 ))
953 })?;
954 record.wait = None;
955 record.updated_at_ms = current_epoch_ms();
956 Self::save_process_conn(tx, &record)?;
957 Ok(record)
958 })()))
959 })
960 .await
961 .map_err(process_sqlite_error)?
962 }
963
964 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
965 let process_id = process_id.to_string();
966 self.conn
967 .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
968 .await
969 .ok()
970 .flatten()
971 }
972
973 async fn list_processes(
974 &self,
975 filter: &lash_core::ProcessListFilter,
976 ) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
977 let filter = filter.clone();
978 self.conn
979 .call(move |conn| {
980 Ok((|| {
981 let mut stmt = conn
982 .prepare(
983 "SELECT record_json FROM processes
984 ORDER BY process_id ASC",
985 )
986 .map_err(process_sqlite_error)?;
987 let rows = stmt
988 .query_map([], |row| row.get::<_, String>(0))
989 .map_err(process_sqlite_error)?;
990 let mut records = Vec::new();
991 for row in rows {
992 let record: ProcessRecord =
993 serde_json::from_str(&row.map_err(process_sqlite_error)?)
994 .map_err(process_decode_error)?;
995 if filter.matches_record(&record) {
996 records.push(record);
997 }
998 }
999 Ok(records)
1000 })())
1001 })
1002 .await
1003 .map_err(process_sqlite_error)?
1004 }
1005
1006 async fn ack_wake(
1007 &self,
1008 process_id: &str,
1009 sequence: u64,
1010 ) -> Result<(), lash_core::PluginError> {
1011 let process_id = process_id.to_string();
1012 self.conn
1013 .call(move |conn| {
1014 Ok((|| {
1015 if Self::load_process_conn(conn, &process_id)?.is_none() {
1016 return Err(lash_core::PluginError::Session(format!(
1017 "unknown process `{process_id}`"
1018 )));
1019 }
1020 conn.execute(
1021 "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
1022 params![process_id, sequence as i64],
1023 )
1024 .map_err(process_sqlite_error)?;
1025 Ok(())
1026 })())
1027 })
1028 .await
1029 .map_err(process_sqlite_error)?
1030 }
1031
1032 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1033 self.conn
1034 .call(move |conn| {
1035 Ok((|| {
1036 let mut stmt = conn
1037 .prepare(
1038 "SELECT record_json FROM processes
1039 WHERE status = 'running'
1040 ORDER BY process_id ASC",
1041 )
1042 .map_err(process_sqlite_error)?;
1043 let rows = stmt
1044 .query_map([], |row| row.get::<_, String>(0))
1045 .map_err(process_sqlite_error)?;
1046 let mut records = Vec::new();
1047 for row in rows {
1048 let record: ProcessRecord =
1049 serde_json::from_str(&row.map_err(process_sqlite_error)?)
1050 .map_err(process_decode_error)?;
1051 records.push(record);
1052 }
1053 Ok(records)
1054 })())
1055 })
1056 .await
1057 .map_err(process_sqlite_error)?
1058 }
1059
1060 async fn claim_process_lease(
1061 &self,
1062 process_id: &str,
1063 owner_id: &str,
1064 lease_ttl_ms: u64,
1065 ) -> Result<ProcessLease, lash_core::PluginError> {
1066 let process_id = process_id.to_string();
1067 let owner_id = owner_id.to_string();
1068 self.conn
1069 .write_flow(move |tx| {
1070 Ok(tx_outcome((|| {
1071 if Self::load_process_conn(tx, &process_id)?.is_none() {
1072 return Err(lash_core::PluginError::Session(format!(
1073 "unknown process `{process_id}`"
1074 )));
1075 }
1076 let now = current_epoch_ms();
1077 let current = Self::load_process_lease_conn(tx, &process_id)?;
1078 if let Some(current) = current.as_ref()
1079 && current.expires_at_epoch_ms > now
1080 && current.owner_id != owner_id
1081 {
1082 return Err(process_lease_conflict(&process_id, current));
1083 }
1084 let fencing_token: u64 = tx
1089 .query_row(
1090 "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1091 params![process_id],
1092 |row| row.get::<_, i64>(0),
1093 )
1094 .optional()
1095 .map_err(process_sqlite_error)?
1096 .unwrap_or(0) as u64
1097 + 1;
1098 let lease = ProcessLease {
1099 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1100 process_id: process_id.clone(),
1101 owner_id: owner_id.clone(),
1102 lease_token: format!(
1103 "{:x}",
1104 Sha256::digest(
1105 format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes()
1106 )
1107 ),
1108 fencing_token,
1109 claimed_at_epoch_ms: now,
1110 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1111 };
1112 tx.execute(
1113 "INSERT INTO process_leases (
1114 process_id, lease_owner_id, lease_token, lease_fencing_token,
1115 lease_claimed_at_ms, lease_expires_at_ms
1116 )
1117 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1118 ON CONFLICT(process_id) DO UPDATE SET
1119 lease_owner_id = excluded.lease_owner_id,
1120 lease_token = excluded.lease_token,
1121 lease_fencing_token = excluded.lease_fencing_token,
1122 lease_claimed_at_ms = excluded.lease_claimed_at_ms,
1123 lease_expires_at_ms = excluded.lease_expires_at_ms",
1124 params![
1125 lease.process_id.as_str(),
1126 lease.owner_id.as_str(),
1127 lease.lease_token.as_str(),
1128 lease.fencing_token as i64,
1129 lease.claimed_at_epoch_ms as i64,
1130 lease.expires_at_epoch_ms as i64,
1131 ],
1132 )
1133 .map_err(process_sqlite_error)?;
1134 Ok(lease)
1135 })()))
1136 })
1137 .await
1138 .map_err(process_sqlite_error)?
1139 }
1140
1141 async fn renew_process_lease(
1142 &self,
1143 lease: &ProcessLease,
1144 lease_ttl_ms: u64,
1145 ) -> Result<ProcessLease, lash_core::PluginError> {
1146 let lease = lease.clone();
1147 self.conn
1148 .write_flow(move |tx| {
1149 Ok(tx_outcome((|| {
1150 let now = current_epoch_ms();
1151 let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
1152 if !guard_lease(current.as_ref(), &lease.lease_token, now) {
1153 return Err(process_lease_expired(&lease.process_id));
1154 }
1155 let renewed = ProcessLease {
1156 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1157 ..lease.clone()
1158 };
1159 tx.execute(
1160 "UPDATE process_leases
1161 SET lease_expires_at_ms = ?2
1162 WHERE process_id = ?1 AND lease_token = ?3",
1163 params![
1164 renewed.process_id.as_str(),
1165 renewed.expires_at_epoch_ms as i64,
1166 renewed.lease_token.as_str(),
1167 ],
1168 )
1169 .map_err(process_sqlite_error)?;
1170 Ok(renewed)
1171 })()))
1172 })
1173 .await
1174 .map_err(process_sqlite_error)?
1175 }
1176
1177 async fn complete_process_lease(
1178 &self,
1179 completion: &ProcessLeaseCompletion,
1180 ) -> Result<(), lash_core::PluginError> {
1181 let process_id = completion.process_id.clone();
1182 let lease_token = completion.lease_token.clone();
1183 self.conn
1184 .call(move |conn| {
1185 conn.execute(
1186 "UPDATE process_leases
1187 SET lease_owner_id = NULL,
1188 lease_token = NULL,
1189 lease_claimed_at_ms = 0,
1190 lease_expires_at_ms = 0
1191 WHERE process_id = ?1 AND lease_token = ?2",
1192 params![process_id, lease_token],
1193 )
1194 })
1195 .await
1196 .map_err(process_sqlite_error)?;
1197 Ok(())
1198 }
1199}
1200
1201fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> lash_core::PluginError {
1204 lash_core::PluginError::Session(format!(
1205 "process `{process_id}` is already leased by `{}` until {}",
1206 current.owner_id, current.expires_at_epoch_ms
1207 ))
1208}
1209
1210fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1212 lash_core::PluginError::Session(format!(
1213 "process lease for `{process_id}` is missing or expired"
1214 ))
1215}