1use super::*;
26
27fn process_status_label(record: &ProcessRecord) -> &'static str {
28 record.status.label()
29}
30
31impl SqliteProcessRegistry {
32 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
33 let conn = SqliteConnection::open(path).await?;
34 ensure_process_schema(&conn).await?;
35 apply_pragmas(&conn, StoreBacking::File).await?;
36 Ok(Self { conn })
37 }
38
39 pub async fn memory() -> tokio_rusqlite::Result<Self> {
40 let conn = SqliteConnection::open_in_memory().await?;
41 ensure_process_schema(&conn).await?;
42 apply_pragmas(&conn, StoreBacking::Memory).await?;
43 Ok(Self { conn })
44 }
45
46 fn load_process_conn(
47 conn: &Connection,
48 process_id: &str,
49 ) -> Result<Option<ProcessRecord>, lash_core::PluginError> {
50 let json: Option<String> = conn
51 .query_row(
52 "SELECT record_json FROM processes WHERE process_id = ?1",
53 params![process_id],
54 |row| row.get(0),
55 )
56 .optional()
57 .map_err(process_sqlite_error)?;
58 json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
59 .transpose()
60 }
61
62 fn save_process_conn(
63 conn: &Connection,
64 record: &ProcessRecord,
65 ) -> Result<(), lash_core::PluginError> {
66 conn.execute(
67 "UPDATE processes
68 SET updated_at_ms = ?2, status = ?3, record_json = ?4
69 WHERE process_id = ?1",
70 params![
71 record.id.as_str(),
72 record.updated_at_ms as i64,
73 process_status_label(record),
74 process_encode_json(record)?
75 ],
76 )
77 .map_err(process_sqlite_error)?;
78 Ok(())
79 }
80
81 fn load_event_by_key_conn(
82 conn: &Connection,
83 process_id: &str,
84 replay_key: &str,
85 ) -> Result<Option<(String, ProcessEvent)>, lash_core::PluginError> {
86 let row: Option<(String, String)> = conn
87 .query_row(
88 "SELECT payload_hash, event_json
89 FROM process_events
90 WHERE process_id = ?1 AND idempotency_key = ?2",
91 params![process_id, replay_key],
92 |row| Ok((row.get(0)?, row.get(1)?)),
93 )
94 .optional()
95 .map_err(process_sqlite_error)?;
96 row.map(|(hash, json)| {
97 serde_json::from_str(&json)
98 .map(|event| (hash, event))
99 .map_err(process_decode_error)
100 })
101 .transpose()
102 }
103
104 fn load_process_lease_conn(
105 conn: &Connection,
106 process_id: &str,
107 ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
108 conn.query_row(
109 "SELECT lease_owner_id, lease_token, lease_fencing_token,
110 lease_claimed_at_ms, lease_expires_at_ms,
111 lease_owner_incarnation_id, lease_owner_liveness_json
112 FROM process_leases
113 WHERE process_id = ?1",
114 params![process_id],
115 |row| {
116 let owner_id: Option<String> = row.get(0)?;
117 let lease_token: Option<String> = row.get(1)?;
118 let incarnation_id: Option<String> = row.get(5)?;
119 let liveness_json: Option<String> = row.get(6)?;
120 let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
121 return Ok(None);
122 };
123 Ok(Some(ProcessLease {
124 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
125 process_id: process_id.to_string(),
126 owner: process_lease_owner_from_columns(
127 owner_id,
128 incarnation_id,
129 liveness_json,
130 ),
131 lease_token,
132 fencing_token: row.get::<_, i64>(2)? as u64,
133 claimed_at_epoch_ms: row.get::<_, i64>(3)? as u64,
134 expires_at_epoch_ms: row.get::<_, i64>(4)? as u64,
135 }))
136 },
137 )
138 .optional()
139 .map(|lease| lease.flatten())
140 .map_err(process_sqlite_error)
141 }
142
143 fn acquire_process_lease_conn(
146 conn: &Connection,
147 process_id: &str,
148 owner: &LeaseOwnerIdentity,
149 fencing_token: u64,
150 now: u64,
151 lease_ttl_ms: u64,
152 ) -> Result<ProcessLease, lash_core::PluginError> {
153 let lease = ProcessLease {
154 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
155 process_id: process_id.to_string(),
156 owner: owner.clone(),
157 lease_token: format!(
158 "{:x}",
159 Sha256::digest(
160 format!(
161 "{process_id}:{}:{}:{now}:{fencing_token}",
162 owner.owner_id, owner.incarnation_id
163 )
164 .as_bytes()
165 )
166 ),
167 fencing_token,
168 claimed_at_epoch_ms: now,
169 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
170 };
171 conn.execute(
172 "INSERT INTO process_leases (
173 process_id, lease_owner_id, lease_owner_incarnation_id,
174 lease_owner_liveness_json, lease_token, lease_fencing_token,
175 lease_claimed_at_ms, lease_expires_at_ms
176 )
177 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
178 ON CONFLICT(process_id) DO UPDATE SET
179 lease_owner_id = excluded.lease_owner_id,
180 lease_owner_incarnation_id = excluded.lease_owner_incarnation_id,
181 lease_owner_liveness_json = excluded.lease_owner_liveness_json,
182 lease_token = excluded.lease_token,
183 lease_fencing_token = excluded.lease_fencing_token,
184 lease_claimed_at_ms = excluded.lease_claimed_at_ms,
185 lease_expires_at_ms = excluded.lease_expires_at_ms",
186 params![
187 lease.process_id.as_str(),
188 lease.owner.owner_id.as_str(),
189 lease.owner.incarnation_id.as_str(),
190 encode_process_lease_liveness(&lease.owner.liveness)?,
191 lease.lease_token.as_str(),
192 lease.fencing_token as i64,
193 lease.claimed_at_epoch_ms as i64,
194 lease.expires_at_epoch_ms as i64,
195 ],
196 )
197 .map_err(process_sqlite_error)?;
198 Ok(lease)
199 }
200
201 fn list_grants_for_scope_conn(
202 conn: &Connection,
203 session_scope: &SessionScope,
204 live_only: bool,
205 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
206 let session_scope_id = session_scope.id();
207 let status_clause = if live_only {
208 "AND p.status = 'running'"
209 } else {
210 ""
211 };
212 let mut stmt = conn
213 .prepare(&format!(
214 "SELECT g.process_id, g.descriptor_json, p.record_json
215 FROM process_handle_grants g
216 JOIN processes p ON p.process_id = g.process_id
217 WHERE g.scope_id = ?1 {status_clause}
218 ORDER BY g.process_id ASC"
219 ))
220 .map_err(process_sqlite_error)?;
221 let rows = stmt
222 .query_map(params![session_scope_id.as_str()], |row| {
223 Ok((
224 row.get::<_, String>(0)?,
225 row.get::<_, String>(1)?,
226 row.get::<_, String>(2)?,
227 ))
228 })
229 .map_err(process_sqlite_error)?;
230 let mut entries = Vec::new();
231 for row in rows {
232 let (process_id, descriptor_json, record_json) = row.map_err(process_sqlite_error)?;
233 let descriptor: ProcessHandleDescriptor =
234 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
235 let record: ProcessRecord =
236 serde_json::from_str(&record_json).map_err(process_decode_error)?;
237 entries.push((
238 ProcessHandleGrant {
239 session_id: session_scope.session_id.clone(),
240 process_id,
241 descriptor,
242 },
243 record,
244 ));
245 }
246 Ok(entries)
247 }
248}
249
250fn tx_outcome<T>(
255 result: Result<T, lash_core::PluginError>,
256) -> TxOutcome<Result<T, lash_core::PluginError>> {
257 match result {
258 Ok(value) => TxOutcome::Commit(Ok(value)),
259 Err(err) => TxOutcome::Rollback(Err(err)),
260 }
261}
262
263#[async_trait::async_trait]
264impl ProcessRegistry for SqliteProcessRegistry {
265 fn durability_tier(&self) -> DurabilityTier {
266 DurabilityTier::Durable
267 }
268
269 async fn register_process(
270 &self,
271 registration: ProcessRegistration,
272 ) -> Result<ProcessRecord, lash_core::PluginError> {
273 let (registration, registration_hash) = prepare_process_registration(registration)?;
274 let record = self
275 .conn
276 .write_flow(move |tx| {
277 Ok(tx_outcome((|| {
278 if let Some(existing) = Self::load_process_conn(tx, ®istration.id)? {
279 if existing.registration_hash == registration_hash {
280 return Ok(existing);
281 }
282 return Err(lash_core::PluginError::Session(format!(
283 "process `{}` registration hash conflict: existing {}, new {}",
284 registration.id, existing.registration_hash, registration_hash
285 )));
286 }
287 let now = current_epoch_ms();
288 let record = ProcessRecord::from_prepared_registration(
289 registration,
290 registration_hash,
291 now,
292 );
293 let originator_scope_id = record.originator_scope_id();
294 tx.execute(
295 "INSERT INTO processes (
296 process_id, registration_hash, owner_scope_id,
297 created_at_ms, updated_at_ms, status, record_json
298 )
299 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
300 params![
301 record.id.as_str(),
302 record.registration_hash.as_str(),
303 originator_scope_id.as_str(),
304 record.created_at_ms as i64,
305 record.updated_at_ms as i64,
306 process_status_label(&record),
307 process_encode_json(&record)?,
308 ],
309 )
310 .map_err(process_sqlite_error)?;
311 Ok(record)
312 })()))
313 })
314 .await
315 .map_err(process_sqlite_error)??;
316 Ok(record)
317 }
318
319 async fn set_external_ref(
320 &self,
321 process_id: &str,
322 external_ref: ProcessExternalRef,
323 ) -> Result<ProcessRecord, lash_core::PluginError> {
324 let process_id = process_id.to_string();
325 let (record, _changed) = self
326 .conn
327 .write_flow(move |tx| {
328 Ok(tx_outcome((|| {
329 let mut record =
330 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
331 lash_core::PluginError::Session(format!(
332 "unknown process `{process_id}`"
333 ))
334 })?;
335 if let Some(existing) = &record.external_ref {
336 if existing == &external_ref {
337 return Ok((record, false));
338 }
339 return Err(process_external_ref_conflict(
340 &process_id,
341 existing,
342 &external_ref,
343 ));
344 }
345 record.external_ref = Some(external_ref);
346 record.updated_at_ms = current_epoch_ms();
347 Self::save_process_conn(tx, &record)?;
348 Ok((record, true))
349 })()))
350 })
351 .await
352 .map_err(process_sqlite_error)??;
353 Ok(record)
354 }
355
356 async fn grant_handle(
357 &self,
358 session_scope: &SessionScope,
359 process_id: &str,
360 descriptor: ProcessHandleDescriptor,
361 ) -> Result<ProcessHandleGrant, lash_core::PluginError> {
362 let session_scope = session_scope.clone();
363 let process_id = process_id.to_string();
364 self.conn
365 .write_flow(move |tx| {
366 Ok(tx_outcome((|| {
367 let session_scope_id = session_scope.id();
368 if Self::load_process_conn(tx, &process_id)?.is_none() {
369 return Err(lash_core::PluginError::Session(format!(
370 "unknown process `{process_id}`"
371 )));
372 }
373 tx.execute(
374 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
375 VALUES (?1, ?2, ?3, ?4)
376 ON CONFLICT(scope_id, process_id) DO UPDATE SET
377 session_id = excluded.session_id,
378 descriptor_json = excluded.descriptor_json",
379 params![
380 session_scope.session_id.as_str(),
381 session_scope_id.as_str(),
382 process_id.as_str(),
383 process_encode_json(&descriptor)?
384 ],
385 )
386 .map_err(process_sqlite_error)?;
387 Ok(ProcessHandleGrant {
388 session_id: session_scope.session_id.clone(),
389 process_id: process_id.clone(),
390 descriptor,
391 })
392 })()))
393 })
394 .await
395 .map_err(process_sqlite_error)?
396 }
397
398 async fn revoke_handle(
399 &self,
400 session_scope: &SessionScope,
401 process_id: &str,
402 ) -> Result<(), lash_core::PluginError> {
403 let session_scope_id = session_scope.id().as_str().to_string();
404 let process_id = process_id.to_string();
405 self.conn
406 .call(move |conn| {
407 conn.execute(
408 "DELETE FROM process_handle_grants WHERE scope_id = ?1 AND process_id = ?2",
409 params![session_scope_id, process_id],
410 )
411 })
412 .await
413 .map_err(process_sqlite_error)?;
414 Ok(())
415 }
416
417 async fn transfer_handle_grants(
418 &self,
419 from_scope: &SessionScope,
420 to_scope: &SessionScope,
421 process_ids: &[String],
422 ) -> Result<(), lash_core::PluginError> {
423 let from_scope = from_scope.clone();
424 let to_scope = to_scope.clone();
425 let process_ids = process_ids.to_vec();
426 self.conn
427 .write_flow(move |tx| {
428 Ok(tx_outcome((|| {
429 let from_scope_id = from_scope.id();
430 let to_scope_id = to_scope.id();
431 for process_id in &process_ids {
432 let descriptor_json: Option<String> = tx
433 .query_row(
434 "SELECT descriptor_json
435 FROM process_handle_grants
436 WHERE scope_id = ?1 AND process_id = ?2",
437 params![from_scope_id.as_str(), process_id.as_str()],
438 |row| row.get(0),
439 )
440 .optional()
441 .map_err(process_sqlite_error)?;
442 let Some(descriptor_json) = descriptor_json else {
443 return Err(lash_core::PluginError::Session(format!(
444 "process handle `{process_id}` is not granted to session `{}`",
445 from_scope.session_id
446 )));
447 };
448 tx.execute(
449 "DELETE FROM process_handle_grants
450 WHERE scope_id = ?1 AND process_id = ?2",
451 params![from_scope_id.as_str(), process_id.as_str()],
452 )
453 .map_err(process_sqlite_error)?;
454 tx.execute(
455 "INSERT INTO process_handle_grants (session_id, scope_id, process_id, descriptor_json)
456 VALUES (?1, ?2, ?3, ?4)
457 ON CONFLICT(scope_id, process_id) DO UPDATE SET
458 session_id = excluded.session_id,
459 descriptor_json = excluded.descriptor_json",
460 params![
461 to_scope.session_id.as_str(),
462 to_scope_id.as_str(),
463 process_id.as_str(),
464 descriptor_json
465 ],
466 )
467 .map_err(process_sqlite_error)?;
468 }
469 Ok(())
470 })()))
471 })
472 .await
473 .map_err(process_sqlite_error)?
474 }
475
476 async fn list_handle_grants(
477 &self,
478 session_scope: &SessionScope,
479 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
480 let session_scope = session_scope.clone();
481 self.conn
482 .call(move |conn| {
483 Ok(Self::list_grants_for_scope_conn(
484 conn,
485 &session_scope,
486 false,
487 ))
488 })
489 .await
490 .map_err(process_sqlite_error)?
491 }
492
493 async fn list_live_handle_grants(
494 &self,
495 session_scope: &SessionScope,
496 ) -> Result<Vec<ProcessHandleGrantEntry>, lash_core::PluginError> {
497 let session_scope = session_scope.clone();
498 self.conn
499 .call(move |conn| Ok(Self::list_grants_for_scope_conn(conn, &session_scope, true)))
500 .await
501 .map_err(process_sqlite_error)?
502 }
503
504 async fn has_handle_grant(
505 &self,
506 session_scope: &SessionScope,
507 process_id: &str,
508 ) -> Result<bool, lash_core::PluginError> {
509 let session_scope_id = session_scope.id().as_str().to_string();
510 let process_id = process_id.to_string();
511 self.conn
512 .call(move |conn| {
513 let exists = conn
514 .query_row(
515 "SELECT 1
516 FROM process_handle_grants g
517 JOIN processes p ON p.process_id = g.process_id
518 WHERE g.scope_id = ?1 AND g.process_id = ?2
519 LIMIT 1",
520 params![session_scope_id, process_id],
521 |_| Ok(()),
522 )
523 .optional()?
524 .is_some();
525 Ok(exists)
526 })
527 .await
528 .map_err(process_sqlite_error)
529 }
530
531 async fn handle_grants_for_process(
532 &self,
533 process_id: &str,
534 ) -> Result<Vec<ProcessHandleGrant>, lash_core::PluginError> {
535 let process_id = process_id.to_string();
536 self.conn
537 .call(move |conn| {
538 Ok((|| {
539 if Self::load_process_conn(conn, &process_id)?.is_none() {
540 return Err(lash_core::PluginError::Session(format!(
541 "unknown process `{process_id}`"
542 )));
543 }
544 let mut stmt = conn
545 .prepare(
546 "SELECT session_id, descriptor_json
547 FROM process_handle_grants
548 WHERE process_id = ?1
549 ORDER BY session_id ASC, scope_id ASC",
550 )
551 .map_err(process_sqlite_error)?;
552 let rows = stmt
553 .query_map(params![process_id], |row| {
554 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
555 })
556 .map_err(process_sqlite_error)?;
557 let mut grants = Vec::new();
558 for row in rows {
559 let (session_id, descriptor_json) = row.map_err(process_sqlite_error)?;
560 let descriptor: ProcessHandleDescriptor =
561 serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
562 grants.push(ProcessHandleGrant {
563 session_id,
564 process_id: process_id.clone(),
565 descriptor,
566 });
567 }
568 Ok(grants)
569 })())
570 })
571 .await
572 .map_err(process_sqlite_error)?
573 }
574
575 async fn delete_session_process_state(
576 &self,
577 session_id: &str,
578 ) -> Result<lash_core::ProcessSessionDeleteReport, lash_core::PluginError> {
579 let session_id_owned = session_id.to_string();
580 let (
581 revoked_handle_count,
582 deleted_wake_count,
583 mut orphaned_process_ids,
584 mut preserved_process_ids,
585 ) = self
586 .conn
587 .write_flow(move |tx| {
588 Ok(tx_outcome((|| {
589 let session_id = session_id_owned;
590 let removed = {
591 let mut stmt = tx
592 .prepare(
593 "SELECT g.process_id, p.record_json
594 FROM process_handle_grants g
595 JOIN processes p ON p.process_id = g.process_id
596 WHERE g.session_id = ?1
597 ORDER BY g.process_id ASC",
598 )
599 .map_err(process_sqlite_error)?;
600 let rows = stmt
601 .query_map(params![session_id], |row| {
602 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
603 })
604 .map_err(process_sqlite_error)?;
605 let mut removed = Vec::new();
606 for row in rows {
607 let (process_id, record_json) = row.map_err(process_sqlite_error)?;
608 let record: ProcessRecord =
609 serde_json::from_str(&record_json).map_err(process_decode_error)?;
610 removed.push((process_id, record));
611 }
612 removed
613 };
614
615 let deleted_wake_count = 0;
620 let revoked_handle_count = tx
621 .execute(
622 "DELETE FROM process_handle_grants WHERE session_id = ?1",
623 params![session_id],
624 )
625 .map_err(process_sqlite_error)?;
626 let mut orphaned_process_ids = Vec::new();
627 let mut preserved_process_ids = Vec::new();
628 for (process_id, record) in removed {
629 if record.is_terminal() {
630 continue;
631 }
632 let remaining_grants: i64 = tx
633 .query_row(
634 "SELECT COUNT(*) FROM process_handle_grants WHERE process_id = ?1",
635 params![process_id],
636 |row| row.get(0),
637 )
638 .map_err(process_sqlite_error)?;
639 if remaining_grants == 0 {
640 orphaned_process_ids.push(process_id);
641 } else {
642 preserved_process_ids.push(process_id);
643 }
644 }
645 let wake_targeted = {
646 let mut stmt = tx
647 .prepare("SELECT process_id, record_json FROM processes ORDER BY process_id ASC")
648 .map_err(process_sqlite_error)?;
649 let rows = stmt
650 .query_map([], |row| {
651 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
652 })
653 .map_err(process_sqlite_error)?;
654 let mut records = Vec::new();
655 for row in rows {
656 let (process_id, record_json) = row.map_err(process_sqlite_error)?;
657 let record: ProcessRecord =
658 serde_json::from_str(&record_json).map_err(process_decode_error)?;
659 records.push((process_id, record));
660 }
661 records
662 };
663 for (_process_id, mut record) in wake_targeted {
664 if record.clear_wake_target_for_session(&session_id) {
665 Self::save_process_conn(tx, &record)?;
666 }
667 }
668 Ok((
669 revoked_handle_count,
670 deleted_wake_count,
671 orphaned_process_ids,
672 preserved_process_ids,
673 ))
674 })()))
675 })
676 .await
677 .map_err(process_sqlite_error)??;
678 orphaned_process_ids.sort();
679 orphaned_process_ids.dedup();
680 preserved_process_ids.sort();
681 preserved_process_ids.dedup();
682 Ok(lash_core::ProcessSessionDeleteReport {
683 session_id: session_id.to_string(),
684 revoked_handle_count,
685 deleted_wake_count,
686 orphaned_process_ids,
687 preserved_process_ids,
688 })
689 }
690
691 async fn append_event(
692 &self,
693 process_id: &str,
694 request: ProcessEventAppendRequest,
695 ) -> Result<ProcessEventAppendResult, lash_core::PluginError> {
696 let process_id = process_id.to_string();
697 let (result, _appended) = self
698 .conn
699 .write_flow(move |tx| {
700 Ok(tx_outcome((|| {
701 let mut record =
702 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
703 lash_core::PluginError::Session(format!(
704 "unknown process `{process_id}`"
705 ))
706 })?;
707 let replay_lookup = if let Some(replay_key) =
708 request.replay.as_ref().map(|replay| replay.key.as_str())
709 {
710 Self::load_event_by_key_conn(tx, &process_id, replay_key)?
711 } else {
712 None
713 };
714 let sequence = tx
715 .query_row(
716 "SELECT COALESCE(MAX(sequence), 0) + 1 FROM process_events WHERE process_id = ?1",
717 params![process_id],
718 |row| row.get::<_, i64>(0),
719 )
720 .map_err(process_sqlite_error)? as u64;
721 let occurred_at_ms = current_epoch_ms();
722 let prepared = prepare_process_event_append(
723 &record,
724 request,
725 sequence,
726 replay_lookup,
727 occurred_at_ms,
728 )?;
729 match prepared {
730 lash_core::ProcessEventAppendPlan::Replay {
731 event,
732 repair_status,
733 wake_delivery,
734 occurred_at_ms,
735 } => {
736 let repaired = if let Some(status) = repair_status {
737 lash_core::apply_process_status_projection(
738 &mut record,
739 status,
740 occurred_at_ms,
741 );
742 Self::save_process_conn(tx, &record)?;
743 true
744 } else {
745 false
746 };
747 Ok((
748 ProcessEventAppendResult {
749 event,
750 wake_delivery,
751 },
752 repaired,
753 ))
754 }
755 lash_core::ProcessEventAppendPlan::Insert {
756 event,
757 payload_hash,
758 status_update,
759 wake_delivery,
760 occurred_at_ms,
761 } => {
762 tx.execute(
763 "INSERT INTO process_events (
764 process_id, sequence, event_type, payload_hash, idempotency_key,
765 occurred_at_ms, event_json
766 )
767 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
768 params![
769 process_id,
770 sequence as i64,
771 event.event_type.as_str(),
772 payload_hash.as_str(),
773 event.invocation.replay_key(),
774 occurred_at_ms as i64,
775 process_encode_json(&event)?,
776 ],
777 )
778 .map_err(process_sqlite_error)?;
779 if let Some(status) = status_update {
780 lash_core::apply_process_status_projection(
781 &mut record,
782 status,
783 occurred_at_ms,
784 );
785 } else {
786 record.updated_at_ms = occurred_at_ms;
787 }
788 Self::save_process_conn(tx, &record)?;
789 Ok((
790 ProcessEventAppendResult {
791 event,
792 wake_delivery,
793 },
794 true,
795 ))
796 }
797 }
798 })()))
799 })
800 .await
801 .map_err(process_sqlite_error)??;
802 Ok(result)
803 }
804
805 async fn events_after(
806 &self,
807 process_id: &str,
808 after_sequence: u64,
809 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
810 let process_id = process_id.to_string();
811 self.conn
812 .call(move |conn| {
813 Ok((|| {
814 if Self::load_process_conn(conn, &process_id)?.is_none() {
815 return Err(lash_core::PluginError::Session(format!(
816 "unknown process `{process_id}`"
817 )));
818 }
819 let mut stmt = conn
820 .prepare(
821 "SELECT event_json FROM process_events
822 WHERE process_id = ?1 AND sequence > ?2
823 ORDER BY sequence ASC",
824 )
825 .map_err(process_sqlite_error)?;
826 let rows = stmt
827 .query_map(params![process_id, after_sequence as i64], |row| {
828 row.get::<_, String>(0)
829 })
830 .map_err(process_sqlite_error)?;
831 let mut events = Vec::new();
832 for row in rows {
833 events.push(
834 serde_json::from_str(&row.map_err(process_sqlite_error)?)
835 .map_err(process_decode_error)?,
836 );
837 }
838 Ok(events)
839 })())
840 })
841 .await
842 .map_err(process_sqlite_error)?
843 }
844
845 async fn count_events_through(
846 &self,
847 process_id: &str,
848 event_type: &str,
849 up_to_sequence: u64,
850 ) -> Result<u64, lash_core::PluginError> {
851 let process_id = process_id.to_string();
852 let event_type = event_type.to_string();
853 self.conn
854 .call(move |conn| {
855 Ok((|| {
856 if Self::load_process_conn(conn, &process_id)?.is_none() {
857 return Err(lash_core::PluginError::Session(format!(
858 "unknown process `{process_id}`"
859 )));
860 }
861 conn.query_row(
862 "SELECT COUNT(*) FROM process_events
863 WHERE process_id = ?1 AND event_type = ?2 AND sequence <= ?3",
864 params![process_id, event_type, up_to_sequence as i64],
865 |row| row.get::<_, i64>(0),
866 )
867 .map(|count| count as u64)
868 .map_err(process_sqlite_error)
869 })())
870 })
871 .await
872 .map_err(process_sqlite_error)?
873 }
874
875 async fn recent_events(
876 &self,
877 process_id: &str,
878 limit: usize,
879 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
880 let process_id = process_id.to_string();
881 self.conn
882 .call(move |conn| {
883 Ok((|| {
884 if Self::load_process_conn(conn, &process_id)?.is_none() {
885 return Err(lash_core::PluginError::Session(format!(
886 "unknown process `{process_id}`"
887 )));
888 }
889 let mut stmt = conn
890 .prepare(
891 "SELECT event_json FROM process_events
892 WHERE process_id = ?1
893 ORDER BY sequence DESC
894 LIMIT ?2",
895 )
896 .map_err(process_sqlite_error)?;
897 let rows = stmt
898 .query_map(params![process_id, limit as i64], |row| {
899 row.get::<_, String>(0)
900 })
901 .map_err(process_sqlite_error)?;
902 let mut events: Vec<ProcessEvent> = Vec::new();
903 for row in rows {
904 events.push(
905 serde_json::from_str(&row.map_err(process_sqlite_error)?)
906 .map_err(process_decode_error)?,
907 );
908 }
909 events.reverse();
910 Ok(events)
911 })())
912 })
913 .await
914 .map_err(process_sqlite_error)?
915 }
916
917 async fn wake_events_after(
918 &self,
919 process_id: &str,
920 after_sequence: u64,
921 ) -> Result<Vec<ProcessEvent>, lash_core::PluginError> {
922 let acked: std::collections::HashSet<u64> = {
923 let process_id = process_id.to_string();
924 self.conn
925 .call(move |conn| {
926 Ok(
927 (|| -> Result<std::collections::HashSet<u64>, lash_core::PluginError> {
928 let mut stmt = conn
929 .prepare(
930 "SELECT sequence FROM process_wake_acks WHERE process_id = ?1",
931 )
932 .map_err(process_sqlite_error)?;
933 let rows = stmt
934 .query_map(params![process_id], |row| row.get::<_, i64>(0))
935 .map_err(process_sqlite_error)?;
936 let mut set = std::collections::HashSet::new();
937 for row in rows {
938 set.insert(row.map_err(process_sqlite_error)? as u64);
939 }
940 Ok(set)
941 })(),
942 )
943 })
944 .await
945 .map_err(process_sqlite_error)??
946 };
947 Ok(self
948 .events_after(process_id, after_sequence)
949 .await?
950 .into_iter()
951 .filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
952 .collect())
953 }
954
955 async fn complete_process(
956 &self,
957 process_id: &str,
958 await_output: ProcessAwaitOutput,
959 ) -> Result<ProcessRecord, lash_core::PluginError> {
960 let event_type = match await_output.terminal_state() {
961 lash_core::ProcessTerminalState::Completed => "process.completed",
962 lash_core::ProcessTerminalState::Failed => "process.failed",
963 lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
964 lash_core::ProcessTerminalState::Abandoned => "process.abandoned",
965 };
966 self.append_event(
967 process_id,
968 ProcessEventAppendRequest::new(
969 event_type,
970 serde_json::json!({ "await_output": await_output }),
971 )
972 .with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
973 )
974 .await?;
975 self.get_process(process_id).await.ok_or_else(|| {
976 lash_core::PluginError::Session(format!(
977 "unknown process `{process_id}` after terminal event"
978 ))
979 })
980 }
981
982 async fn record_first_started(
983 &self,
984 process_id: &str,
985 started: ProcessStarted,
986 ) -> Result<ProcessRecord, lash_core::PluginError> {
987 let process_id = process_id.to_string();
988 self.conn
989 .write_flow(move |tx| {
990 Ok(tx_outcome((|| {
991 let mut record =
992 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
993 lash_core::PluginError::Session(format!(
994 "unknown process `{process_id}`"
995 ))
996 })?;
997 if record.first_started.is_none() {
999 record.first_started = Some(Box::new(started));
1000 record.updated_at_ms = current_epoch_ms();
1001 Self::save_process_conn(tx, &record)?;
1002 }
1003 Ok(record)
1004 })()))
1005 })
1006 .await
1007 .map_err(process_sqlite_error)?
1008 }
1009
1010 async fn request_process_abandon(
1011 &self,
1012 process_id: &str,
1013 request: AbandonRequest,
1014 ) -> Result<ProcessRecord, lash_core::PluginError> {
1015 let process_id = process_id.to_string();
1016 self.conn
1017 .write_flow(move |tx| {
1018 Ok(tx_outcome((|| {
1019 let mut record =
1020 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
1021 lash_core::PluginError::Session(format!(
1022 "unknown process `{process_id}`"
1023 ))
1024 })?;
1025 if record.is_terminal() {
1026 return Err(lash_core::PluginError::Session(format!(
1027 "terminal process `{process_id}` cannot accept an abandon request"
1028 )));
1029 }
1030 if record.abandon_request.is_none() {
1032 record.abandon_request = Some(Box::new(request));
1033 record.updated_at_ms = current_epoch_ms();
1034 Self::save_process_conn(tx, &record)?;
1035 }
1036 Ok(record)
1037 })()))
1038 })
1039 .await
1040 .map_err(process_sqlite_error)?
1041 }
1042
1043 async fn set_process_wait(
1044 &self,
1045 process_id: &str,
1046 wait: lash_core::WaitState,
1047 ) -> Result<ProcessRecord, lash_core::PluginError> {
1048 let process_id = process_id.to_string();
1049 self.conn
1050 .write_flow(move |tx| {
1051 Ok(tx_outcome((|| {
1052 let mut record =
1053 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
1054 lash_core::PluginError::Session(format!(
1055 "unknown process `{process_id}`"
1056 ))
1057 })?;
1058 if record.is_terminal() {
1059 return Err(lash_core::PluginError::Session(format!(
1060 "terminal process `{process_id}` cannot enter a wait state"
1061 )));
1062 }
1063 record.wait = Some(wait);
1064 record.updated_at_ms = current_epoch_ms();
1065 Self::save_process_conn(tx, &record)?;
1066 Ok(record)
1067 })()))
1068 })
1069 .await
1070 .map_err(process_sqlite_error)?
1071 }
1072
1073 async fn clear_process_wait(
1074 &self,
1075 process_id: &str,
1076 ) -> Result<ProcessRecord, lash_core::PluginError> {
1077 let process_id = process_id.to_string();
1078 self.conn
1079 .write_flow(move |tx| {
1080 Ok(tx_outcome((|| {
1081 let mut record =
1082 Self::load_process_conn(tx, &process_id)?.ok_or_else(|| {
1083 lash_core::PluginError::Session(format!(
1084 "unknown process `{process_id}`"
1085 ))
1086 })?;
1087 record.wait = None;
1088 record.updated_at_ms = current_epoch_ms();
1089 Self::save_process_conn(tx, &record)?;
1090 Ok(record)
1091 })()))
1092 })
1093 .await
1094 .map_err(process_sqlite_error)?
1095 }
1096
1097 async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
1098 let process_id = process_id.to_string();
1099 self.conn
1100 .call(move |conn| Ok(Self::load_process_conn(conn, &process_id).ok().flatten()))
1101 .await
1102 .ok()
1103 .flatten()
1104 }
1105
1106 async fn list_processes(
1107 &self,
1108 filter: &lash_core::ProcessListFilter,
1109 ) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1110 let filter = filter.clone();
1111 self.conn
1112 .call(move |conn| {
1113 Ok((|| {
1114 let mut stmt = conn
1115 .prepare(
1116 "SELECT record_json FROM processes
1117 ORDER BY process_id ASC",
1118 )
1119 .map_err(process_sqlite_error)?;
1120 let rows = stmt
1121 .query_map([], |row| row.get::<_, String>(0))
1122 .map_err(process_sqlite_error)?;
1123 let mut records = Vec::new();
1124 for row in rows {
1125 let record: ProcessRecord =
1126 serde_json::from_str(&row.map_err(process_sqlite_error)?)
1127 .map_err(process_decode_error)?;
1128 if filter.matches_record(&record) {
1129 records.push(record);
1130 }
1131 }
1132 Ok(records)
1133 })())
1134 })
1135 .await
1136 .map_err(process_sqlite_error)?
1137 }
1138
1139 async fn ack_wake(
1140 &self,
1141 process_id: &str,
1142 sequence: u64,
1143 ) -> Result<(), lash_core::PluginError> {
1144 let process_id = process_id.to_string();
1145 self.conn
1146 .call(move |conn| {
1147 Ok((|| {
1148 if Self::load_process_conn(conn, &process_id)?.is_none() {
1149 return Err(lash_core::PluginError::Session(format!(
1150 "unknown process `{process_id}`"
1151 )));
1152 }
1153 conn.execute(
1154 "INSERT OR IGNORE INTO process_wake_acks (process_id, sequence) VALUES (?1, ?2)",
1155 params![process_id, sequence as i64],
1156 )
1157 .map_err(process_sqlite_error)?;
1158 Ok(())
1159 })())
1160 })
1161 .await
1162 .map_err(process_sqlite_error)?
1163 }
1164
1165 async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, lash_core::PluginError> {
1166 self.conn
1167 .call(move |conn| {
1168 Ok((|| {
1169 let mut stmt = conn
1170 .prepare(
1171 "SELECT record_json FROM processes
1172 WHERE status = 'running'
1173 ORDER BY process_id ASC",
1174 )
1175 .map_err(process_sqlite_error)?;
1176 let rows = stmt
1177 .query_map([], |row| row.get::<_, String>(0))
1178 .map_err(process_sqlite_error)?;
1179 let mut records = Vec::new();
1180 for row in rows {
1181 let record: ProcessRecord =
1182 serde_json::from_str(&row.map_err(process_sqlite_error)?)
1183 .map_err(process_decode_error)?;
1184 records.push(record);
1185 }
1186 Ok(records)
1187 })())
1188 })
1189 .await
1190 .map_err(process_sqlite_error)?
1191 }
1192
1193 async fn claim_process_lease(
1194 &self,
1195 process_id: &str,
1196 owner: &LeaseOwnerIdentity,
1197 lease_ttl_ms: u64,
1198 ) -> Result<ProcessLeaseClaimOutcome, lash_core::PluginError> {
1199 let process_id = process_id.to_string();
1200 let owner = owner.clone();
1201 self.conn
1202 .write_flow(move |tx| {
1203 Ok(tx_outcome((|| {
1204 if Self::load_process_conn(tx, &process_id)?.is_none() {
1205 return Err(lash_core::PluginError::Session(format!(
1206 "unknown process `{process_id}`"
1207 )));
1208 }
1209 let now = current_epoch_ms();
1210 let current = Self::load_process_lease_conn(tx, &process_id)?;
1211 if let Some(current) = current.as_ref()
1212 && current.expires_at_epoch_ms > now
1213 {
1214 if current.owner.same_incarnation(&owner) {
1215 let expires_at = now.saturating_add(lease_ttl_ms);
1218 tx.execute(
1219 "UPDATE process_leases
1220 SET lease_expires_at_ms = ?2
1221 WHERE process_id = ?1",
1222 params![process_id, expires_at as i64],
1223 )
1224 .map_err(process_sqlite_error)?;
1225 return Ok(ProcessLeaseClaimOutcome::Acquired(ProcessLease {
1226 expires_at_epoch_ms: expires_at,
1227 ..current.clone()
1228 }));
1229 }
1230 return Ok(ProcessLeaseClaimOutcome::Busy {
1231 holder: current.clone(),
1232 });
1233 }
1234 let fencing_token: u64 = tx
1239 .query_row(
1240 "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1241 params![process_id],
1242 |row| row.get::<_, i64>(0),
1243 )
1244 .optional()
1245 .map_err(process_sqlite_error)?
1246 .unwrap_or(0) as u64
1247 + 1;
1248 Ok(ProcessLeaseClaimOutcome::Acquired(
1249 Self::acquire_process_lease_conn(
1250 tx,
1251 &process_id,
1252 &owner,
1253 fencing_token,
1254 now,
1255 lease_ttl_ms,
1256 )?,
1257 ))
1258 })()))
1259 })
1260 .await
1261 .map_err(process_sqlite_error)?
1262 }
1263
1264 async fn reclaim_process_lease(
1265 &self,
1266 process_id: &str,
1267 owner: &LeaseOwnerIdentity,
1268 observed_holder: &ProcessLease,
1269 lease_ttl_ms: u64,
1270 ) -> Result<ProcessLeaseClaimOutcome, lash_core::PluginError> {
1271 let process_id = process_id.to_string();
1272 let owner = owner.clone();
1273 let observed_holder = observed_holder.clone();
1274 self.conn
1275 .write_flow(move |tx| {
1276 Ok(tx_outcome((|| {
1277 if Self::load_process_conn(tx, &process_id)?.is_none() {
1278 return Err(lash_core::PluginError::Session(format!(
1279 "unknown process `{process_id}`"
1280 )));
1281 }
1282 let now = current_epoch_ms();
1283 let current = Self::load_process_lease_conn(tx, &process_id)?;
1284 let Some(current) = current else {
1285 let fencing_token: u64 = tx
1288 .query_row(
1289 "SELECT lease_fencing_token FROM process_leases WHERE process_id = ?1",
1290 params![process_id],
1291 |row| row.get::<_, i64>(0),
1292 )
1293 .optional()
1294 .map_err(process_sqlite_error)?
1295 .unwrap_or(0) as u64
1296 + 1;
1297 return Ok(ProcessLeaseClaimOutcome::Acquired(
1298 Self::acquire_process_lease_conn(
1299 tx,
1300 &process_id,
1301 &owner,
1302 fencing_token,
1303 now,
1304 lease_ttl_ms,
1305 )?,
1306 ));
1307 };
1308 if current.expires_at_epoch_ms <= now {
1309 return Ok(ProcessLeaseClaimOutcome::Acquired(
1310 Self::acquire_process_lease_conn(
1311 tx,
1312 &process_id,
1313 &owner,
1314 current.fencing_token.saturating_add(1),
1315 now,
1316 lease_ttl_ms,
1317 )?,
1318 ));
1319 }
1320 if observed_holder.process_id == process_id
1324 && current.owner.same_incarnation(&observed_holder.owner)
1325 && current.lease_token == observed_holder.lease_token
1326 && current.fencing_token == observed_holder.fencing_token
1327 && current.owner.is_definitely_dead_for_claimant(&owner)
1328 {
1329 let fencing_token = current.fencing_token.saturating_add(1);
1330 let lease = ProcessLease {
1331 schema_version: PROCESS_LEASE_SCHEMA_VERSION,
1332 process_id: process_id.clone(),
1333 owner: owner.clone(),
1334 lease_token: format!(
1335 "{:x}",
1336 Sha256::digest(
1337 format!(
1338 "{process_id}:{}:{}:{now}:{fencing_token}",
1339 owner.owner_id, owner.incarnation_id
1340 )
1341 .as_bytes()
1342 )
1343 ),
1344 fencing_token,
1345 claimed_at_epoch_ms: now,
1346 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1347 };
1348 let changed = tx
1349 .execute(
1350 "UPDATE process_leases
1351 SET lease_owner_id = ?1,
1352 lease_owner_incarnation_id = ?2,
1353 lease_owner_liveness_json = ?3,
1354 lease_token = ?4,
1355 lease_fencing_token = ?5,
1356 lease_claimed_at_ms = ?6,
1357 lease_expires_at_ms = ?7
1358 WHERE process_id = ?8
1359 AND lease_owner_id = ?9
1360 AND lease_owner_incarnation_id = ?10
1361 AND lease_token = ?11
1362 AND lease_fencing_token = ?12",
1363 params![
1364 lease.owner.owner_id,
1365 lease.owner.incarnation_id,
1366 encode_process_lease_liveness(&lease.owner.liveness)?,
1367 lease.lease_token,
1368 lease.fencing_token as i64,
1369 lease.claimed_at_epoch_ms as i64,
1370 lease.expires_at_epoch_ms as i64,
1371 process_id,
1372 observed_holder.owner.owner_id,
1373 observed_holder.owner.incarnation_id,
1374 observed_holder.lease_token,
1375 observed_holder.fencing_token as i64,
1376 ],
1377 )
1378 .map_err(process_sqlite_error)?;
1379 if changed == 1 {
1380 return Ok(ProcessLeaseClaimOutcome::Acquired(lease));
1381 }
1382 if let Some(current) = Self::load_process_lease_conn(tx, &process_id)?
1384 && current.expires_at_epoch_ms > now
1385 {
1386 return Ok(ProcessLeaseClaimOutcome::Busy { holder: current });
1387 }
1388 return Err(process_lease_expired(&process_id));
1389 }
1390 Ok(ProcessLeaseClaimOutcome::Busy { holder: current })
1391 })()))
1392 })
1393 .await
1394 .map_err(process_sqlite_error)?
1395 }
1396
1397 async fn renew_process_lease(
1398 &self,
1399 lease: &ProcessLease,
1400 lease_ttl_ms: u64,
1401 ) -> Result<ProcessLease, lash_core::PluginError> {
1402 let lease = lease.clone();
1403 self.conn
1404 .write_flow(move |tx| {
1405 Ok(tx_outcome((|| {
1406 let now = current_epoch_ms();
1407 let current = Self::load_process_lease_conn(tx, &lease.process_id)?;
1408 if !guard_lease(current.as_ref(), &lease.lease_token, now)
1409 || !current.as_ref().is_some_and(|current| {
1410 current.owner.same_incarnation(&lease.owner)
1411 && current.fencing_token == lease.fencing_token
1412 })
1413 {
1414 return Err(process_lease_expired(&lease.process_id));
1415 }
1416 let renewed = ProcessLease {
1417 expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
1418 ..lease.clone()
1419 };
1420 tx.execute(
1421 "UPDATE process_leases
1422 SET lease_expires_at_ms = ?2
1423 WHERE process_id = ?1 AND lease_token = ?3",
1424 params![
1425 renewed.process_id.as_str(),
1426 renewed.expires_at_epoch_ms as i64,
1427 renewed.lease_token.as_str(),
1428 ],
1429 )
1430 .map_err(process_sqlite_error)?;
1431 Ok(renewed)
1432 })()))
1433 })
1434 .await
1435 .map_err(process_sqlite_error)?
1436 }
1437
1438 async fn get_process_lease(
1439 &self,
1440 process_id: &str,
1441 ) -> Result<Option<ProcessLease>, lash_core::PluginError> {
1442 let process_id = process_id.to_string();
1443 self.conn
1444 .call(move |conn| Ok(Self::load_process_lease_conn(conn, &process_id)))
1445 .await
1446 .map_err(process_sqlite_error)?
1447 }
1448
1449 async fn complete_process_lease(
1450 &self,
1451 completion: &ProcessLeaseCompletion,
1452 ) -> Result<(), lash_core::PluginError> {
1453 let process_id = completion.process_id.clone();
1454 let lease_token = completion.lease_token.clone();
1455 self.conn
1456 .call(move |conn| {
1457 conn.execute(
1458 "UPDATE process_leases
1459 SET lease_owner_id = NULL,
1460 lease_token = NULL,
1461 lease_claimed_at_ms = 0,
1462 lease_expires_at_ms = 0
1463 WHERE process_id = ?1 AND lease_token = ?2",
1464 params![process_id, lease_token],
1465 )
1466 })
1467 .await
1468 .map_err(process_sqlite_error)?;
1469 Ok(())
1470 }
1471
1472 async fn prune_terminal_processes(
1473 &self,
1474 cutoff_epoch_ms: u64,
1475 ) -> Result<ProcessPruneReport, lash_core::PluginError> {
1476 let cutoff = cutoff_epoch_ms as i64;
1477 self.conn
1478 .write_flow(move |tx| {
1479 Ok(tx_outcome((|| {
1480 const SELECTOR: &str = "process_id IN (
1485 SELECT process_id FROM processes
1486 WHERE status != 'running' AND updated_at_ms < ?1
1487 )";
1488 let pruned_events = tx
1489 .execute(
1490 &format!("DELETE FROM process_events WHERE {SELECTOR}"),
1491 params![cutoff],
1492 )
1493 .map_err(process_sqlite_error)?;
1494 tx.execute(
1495 &format!("DELETE FROM process_wake_acks WHERE {SELECTOR}"),
1496 params![cutoff],
1497 )
1498 .map_err(process_sqlite_error)?;
1499 tx.execute(
1500 &format!("DELETE FROM process_handle_grants WHERE {SELECTOR}"),
1501 params![cutoff],
1502 )
1503 .map_err(process_sqlite_error)?;
1504 tx.execute(
1505 &format!("DELETE FROM process_leases WHERE {SELECTOR}"),
1506 params![cutoff],
1507 )
1508 .map_err(process_sqlite_error)?;
1509 let pruned_processes = tx
1510 .execute(
1511 "DELETE FROM processes
1512 WHERE status != 'running' AND updated_at_ms < ?1",
1513 params![cutoff],
1514 )
1515 .map_err(process_sqlite_error)?;
1516 Ok(ProcessPruneReport {
1517 pruned_processes,
1518 pruned_events,
1519 })
1520 })()))
1521 })
1522 .await
1523 .map_err(process_sqlite_error)?
1524 }
1525}
1526
1527fn process_lease_expired(process_id: &str) -> lash_core::PluginError {
1529 lash_core::PluginError::Session(format!(
1530 "process lease for `{process_id}` is missing or expired"
1531 ))
1532}
1533
1534fn process_lease_owner_from_columns(
1535 owner_id: String,
1536 incarnation_id: Option<String>,
1537 liveness_json: Option<String>,
1538) -> LeaseOwnerIdentity {
1539 LeaseOwnerIdentity {
1540 incarnation_id: incarnation_id.unwrap_or_else(|| owner_id.clone()),
1541 owner_id,
1542 liveness: liveness_json
1543 .as_deref()
1544 .and_then(|json| serde_json::from_str(json).ok())
1545 .unwrap_or(LeaseOwnerLiveness::Opaque),
1546 }
1547}
1548
1549fn encode_process_lease_liveness(
1550 liveness: &LeaseOwnerLiveness,
1551) -> Result<String, lash_core::PluginError> {
1552 serde_json::to_string(liveness).map_err(|err| {
1553 lash_core::PluginError::Session(format!("failed to encode process lease liveness: {err}"))
1554 })
1555}
1556
1557fn process_external_ref_conflict(
1558 process_id: &str,
1559 existing: &ProcessExternalRef,
1560 new: &ProcessExternalRef,
1561) -> lash_core::PluginError {
1562 lash_core::PluginError::Session(format!(
1563 "process `{process_id}` external ref conflict: existing {existing:?}, new {new:?}"
1564 ))
1565}