1use std::collections::BTreeMap;
2use std::fmt;
3use std::sync::Arc;
4use std::sync::Mutex;
5
6use serde::{Deserialize, Serialize};
7
8use super::events::{
9 ProcessAwaitOutput, ProcessEventType, ProcessTerminalSemantics, ProcessTerminalState,
10 default_process_event_types,
11};
12use super::validation::{
13 ensure_core_event_types, process_registration_hash, validate_process_registration,
14};
15
16pub type ProcessId = String;
17
18#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
19#[serde(transparent)]
20pub struct SessionScopeId(String);
21
22impl SessionScopeId {
23 pub fn new(value: impl Into<String>) -> Self {
24 Self(value.into())
25 }
26
27 pub fn as_str(&self) -> &str {
28 &self.0
29 }
30}
31
32impl fmt::Display for SessionScopeId {
33 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
34 formatter.write_str(&self.0)
35 }
36}
37
38impl From<String> for SessionScopeId {
39 fn from(value: String) -> Self {
40 Self::new(value)
41 }
42}
43
44impl From<&str> for SessionScopeId {
45 fn from(value: &str) -> Self {
46 Self::new(value)
47 }
48}
49
50#[derive(Debug, Serialize, Deserialize)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum ProcessInput {
61 ToolCall {
62 call: crate::PreparedToolCall,
63 },
64 Engine {
65 kind: String,
66 #[serde(default)]
67 payload: serde_json::Value,
68 },
69 SessionTurn {
70 create_request: Box<crate::SessionCreateRequest>,
71 turn_input: Box<crate::TurnInput>,
72 output_contract: crate::ToolOutputContract,
73 },
74 External {
75 #[serde(default)]
76 metadata: serde_json::Value,
77 },
78}
79
80impl Clone for ProcessInput {
81 fn clone(&self) -> Self {
82 match self {
83 Self::ToolCall { call } => Self::ToolCall { call: call.clone() },
84 Self::Engine { kind, payload } => Self::Engine {
85 kind: kind.clone(),
86 payload: payload.clone(),
87 },
88 Self::SessionTurn {
89 create_request,
90 turn_input,
91 output_contract,
92 } => Self::SessionTurn {
93 create_request: create_request.clone(),
94 turn_input: turn_input.clone(),
95 output_contract: output_contract.clone(),
96 },
97 Self::External { metadata } => Self::External {
98 metadata: metadata.clone(),
99 },
100 }
101 }
102}
103
104impl PartialEq for ProcessInput {
105 fn eq(&self, other: &Self) -> bool {
106 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
107 }
108}
109
110impl ProcessInput {
111 pub fn engine_kind(&self) -> &'static str {
112 match self {
113 Self::ToolCall { .. } => "tool",
114 Self::Engine { .. } => "engine",
115 Self::SessionTurn { .. } => "session_turn",
116 Self::External { .. } => "external",
117 }
118 }
119
120 pub fn engine_specific_kind(&self) -> Option<&str> {
121 match self {
122 Self::Engine { kind, .. } => Some(kind.as_str()),
123 _ => None,
124 }
125 }
126}
127
128#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
136#[serde(rename_all = "snake_case")]
137pub enum RecoveryDisposition {
138 Rerunnable,
141 OwnerBound,
145 ExternallyOwned,
148}
149
150#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
155pub struct ProcessStarted {
156 pub owner: crate::LeaseOwnerIdentity,
157 pub started_at_ms: u64,
158}
159
160#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
166pub struct AbandonRequest {
167 pub requested_by: String,
168 pub requested_at_ms: u64,
169 #[serde(default, skip_serializing_if = "Option::is_none")]
170 pub reason: Option<String>,
171}
172
173#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
174#[serde(transparent)]
175pub struct ProcessExecutionEnvRef(String);
176
177impl ProcessExecutionEnvRef {
178 pub fn new(value: impl Into<String>) -> Self {
179 Self(value.into())
180 }
181
182 pub fn as_str(&self) -> &str {
183 &self.0
184 }
185}
186
187impl fmt::Display for ProcessExecutionEnvRef {
188 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
189 formatter.write_str(&self.0)
190 }
191}
192
193#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
194#[serde(deny_unknown_fields)]
195pub struct ProcessExecutionEnvSpec {
196 #[serde(default)]
197 pub plugin_options: crate::PluginOptions,
198 #[serde(default)]
199 pub policy: crate::SessionPolicy,
200}
201
202impl ProcessExecutionEnvSpec {
203 pub fn new(plugin_options: crate::PluginOptions, policy: crate::SessionPolicy) -> Self {
204 Self {
205 plugin_options,
206 policy,
207 }
208 }
209
210 pub fn stable_ref(&self) -> Result<ProcessExecutionEnvRef, serde_json::Error> {
211 crate::stable_hash::stable_json_sha256_hex(self)
212 .map(|hash| ProcessExecutionEnvRef::new(format!("process-env:sha256:{hash}")))
213 }
214
215 pub fn to_store_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
216 serde_json::to_vec(self)
217 }
218
219 pub fn from_store_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
220 serde_json::from_slice(bytes)
221 }
222}
223
224#[async_trait::async_trait]
225pub trait ProcessExecutionEnvStore: Send + Sync {
226 fn durability_tier(&self) -> crate::DurabilityTier {
227 crate::DurabilityTier::Inline
228 }
229
230 async fn put_process_execution_env(
231 &self,
232 env_ref: &ProcessExecutionEnvRef,
233 bytes: &[u8],
234 ) -> Result<(), crate::PluginError>;
235
236 async fn get_process_execution_env(
237 &self,
238 env_ref: &ProcessExecutionEnvRef,
239 ) -> Result<Option<Vec<u8>>, crate::PluginError>;
240}
241
242#[derive(Default)]
243pub struct InMemoryProcessExecutionEnvStore {
244 envs: Mutex<BTreeMap<String, Vec<u8>>>,
245}
246
247impl InMemoryProcessExecutionEnvStore {
248 pub fn new() -> Self {
249 Self::default()
250 }
251}
252
253#[async_trait::async_trait]
254impl ProcessExecutionEnvStore for InMemoryProcessExecutionEnvStore {
255 async fn put_process_execution_env(
256 &self,
257 env_ref: &ProcessExecutionEnvRef,
258 bytes: &[u8],
259 ) -> Result<(), crate::PluginError> {
260 self.envs
261 .lock()
262 .map_err(|_| {
263 crate::PluginError::Session("process execution env store lock poisoned".to_string())
264 })?
265 .insert(env_ref.as_str().to_string(), bytes.to_vec());
266 Ok(())
267 }
268
269 async fn get_process_execution_env(
270 &self,
271 env_ref: &ProcessExecutionEnvRef,
272 ) -> Result<Option<Vec<u8>>, crate::PluginError> {
273 Ok(self
274 .envs
275 .lock()
276 .map_err(|_| {
277 crate::PluginError::Session("process execution env store lock poisoned".to_string())
278 })?
279 .get(env_ref.as_str())
280 .cloned())
281 }
282}
283
284pub async fn persist_process_execution_env(
285 env_store: &dyn ProcessExecutionEnvStore,
286 spec: &ProcessExecutionEnvSpec,
287) -> Result<ProcessExecutionEnvRef, crate::PluginError> {
288 let env_ref = spec.stable_ref().map_err(|err| {
289 crate::PluginError::Session(format!("failed to hash process execution env: {err}"))
290 })?;
291 let bytes = spec.to_store_bytes().map_err(|err| {
292 crate::PluginError::Session(format!("failed to encode process execution env: {err}"))
293 })?;
294 env_store
295 .put_process_execution_env(&env_ref, &bytes)
296 .await?;
297 Ok(env_ref)
298}
299
300pub async fn load_process_execution_env(
301 env_store: &dyn ProcessExecutionEnvStore,
302 env_ref: &ProcessExecutionEnvRef,
303) -> Result<ProcessExecutionEnvSpec, crate::PluginError> {
304 let bytes = env_store
305 .get_process_execution_env(env_ref)
306 .await?
307 .ok_or_else(|| {
308 crate::PluginError::Session(format!("missing process execution env `{env_ref}`"))
309 })?;
310 ProcessExecutionEnvSpec::from_store_bytes(&bytes).map_err(|err| {
311 crate::PluginError::Session(format!(
312 "failed to decode process execution env `{env_ref}`: {err}"
313 ))
314 })
315}
316
317#[derive(Clone, Debug, Default, Serialize, Deserialize)]
320pub struct ProcessExecutionContext {
321 #[serde(default, skip_serializing_if = "Option::is_none")]
322 pub causal_invocation: Option<crate::RuntimeInvocation>,
323}
324
325impl ProcessExecutionContext {
326 pub fn with_causal_invocation(mut self, invocation: Option<crate::RuntimeInvocation>) -> Self {
327 self.causal_invocation = invocation;
328 self
329 }
330
331 pub fn is_empty(&self) -> bool {
332 self.causal_invocation.is_none()
333 }
334}
335
336#[derive(Clone)]
337pub struct ProcessOpScope<'scope> {
338 pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
339 pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'scope>,
340 pub(crate) agent_frame_id: Option<crate::AgentFrameId>,
341 pub(crate) target_agent_frame_id: Option<crate::AgentFrameId>,
342}
343
344impl<'scope> ProcessOpScope<'scope> {
345 pub fn new(scoped_effect_controller: crate::ScopedEffectController<'scope>) -> Self {
346 Self {
347 parent_invocation: None,
348 effect_controller: crate::runtime::RuntimeEffectControllerHandle::borrowed(
349 scoped_effect_controller,
350 ),
351 agent_frame_id: None,
352 target_agent_frame_id: None,
353 }
354 }
355
356 pub fn with_parent_invocation(
357 mut self,
358 parent_invocation: Option<crate::RuntimeInvocation>,
359 ) -> Self {
360 self.parent_invocation = parent_invocation;
361 self
362 }
363
364 pub fn with_agent_frame_id(mut self, agent_frame_id: Option<crate::AgentFrameId>) -> Self {
365 self.agent_frame_id = agent_frame_id;
366 self
367 }
368
369 pub fn with_target_agent_frame_id(
370 mut self,
371 agent_frame_id: Option<crate::AgentFrameId>,
372 ) -> Self {
373 self.target_agent_frame_id = agent_frame_id;
374 self
375 }
376
377 pub fn agent_frame_id(&self) -> Option<&str> {
378 self.agent_frame_id.as_deref()
379 }
380
381 pub fn target_agent_frame_id(&self) -> Option<&str> {
382 self.target_agent_frame_id.as_deref()
383 }
384
385 pub(crate) fn controller(&self) -> &dyn crate::RuntimeEffectController {
386 self.effect_controller.controller()
387 }
388}
389
390#[derive(Clone, Debug, Default)]
391pub struct ProcessStartOptions {
392 pub descriptor: Option<ProcessHandleDescriptor>,
393 pub spawn_provenance: Option<ProcessSpawnProvenance>,
401}
402
403#[derive(Clone, Debug, PartialEq, Eq)]
408pub struct ProcessSpawnProvenance {
409 pub originator: ProcessOriginator,
410 pub wake_target: Option<SessionScope>,
411}
412
413impl ProcessStartOptions {
414 pub fn new() -> Self {
415 Self::default()
416 }
417
418 pub fn with_descriptor(mut self, descriptor: ProcessHandleDescriptor) -> Self {
419 self.descriptor = Some(descriptor);
420 self
421 }
422
423 pub fn with_optional_descriptor(mut self, descriptor: Option<ProcessHandleDescriptor>) -> Self {
424 self.descriptor = descriptor;
425 self
426 }
427
428 pub fn with_spawn_provenance(mut self, spawn_provenance: ProcessSpawnProvenance) -> Self {
429 self.spawn_provenance = Some(spawn_provenance);
430 self
431 }
432
433 pub fn execution_context(&self, scope: &ProcessOpScope<'_>) -> ProcessExecutionContext {
434 ProcessExecutionContext {
435 causal_invocation: scope.parent_invocation.clone(),
436 }
437 }
438}
439
440#[derive(Clone, Debug, Serialize, Deserialize)]
442pub struct ProcessStartRequest {
443 pub id: ProcessId,
444 pub input: ProcessInput,
445 pub disposition: RecoveryDisposition,
446 #[serde(default, skip_serializing_if = "Option::is_none")]
447 pub env_spec: Option<ProcessExecutionEnvSpec>,
448 pub originator: ProcessOriginator,
449 #[serde(default, skip_serializing_if = "Option::is_none")]
450 pub wake_target: Option<SessionScope>,
451 #[serde(default, skip_serializing_if = "Option::is_none")]
452 pub grant: Option<ProcessStartGrant>,
453 #[serde(default)]
454 pub event_types: Vec<ProcessEventType>,
455}
456
457impl ProcessStartRequest {
458 pub fn new(
459 id: impl Into<ProcessId>,
460 input: ProcessInput,
461 disposition: RecoveryDisposition,
462 originator: ProcessOriginator,
463 ) -> Self {
464 Self {
465 id: id.into(),
466 input,
467 disposition,
468 env_spec: None,
469 originator,
470 wake_target: None,
471 grant: None,
472 event_types: default_process_event_types(),
473 }
474 }
475
476 pub fn external(
479 id: impl Into<ProcessId>,
480 originator: ProcessOriginator,
481 metadata: serde_json::Value,
482 ) -> Self {
483 Self::new(
484 id,
485 ProcessInput::External { metadata },
486 RecoveryDisposition::ExternallyOwned,
487 originator,
488 )
489 }
490
491 pub fn with_env_spec(mut self, env_spec: ProcessExecutionEnvSpec) -> Self {
492 self.env_spec = Some(env_spec);
493 self
494 }
495
496 pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
497 self.wake_target = wake_target;
498 self
499 }
500
501 pub fn with_grant(mut self, grant: Option<ProcessStartGrant>) -> Self {
502 self.grant = grant;
503 self
504 }
505
506 pub fn with_event_types(
507 mut self,
508 event_types: impl IntoIterator<Item = ProcessEventType>,
509 ) -> Self {
510 self.event_types = event_types.into_iter().collect();
511 self
512 }
513
514 pub fn with_extra_event_types(
515 mut self,
516 event_types: impl IntoIterator<Item = ProcessEventType>,
517 ) -> Self {
518 self.event_types.extend(event_types);
519 self
520 }
521
522 pub fn into_registration(self, env_ref: Option<ProcessExecutionEnvRef>) -> ProcessRegistration {
523 ProcessRegistration::new(
524 self.id,
525 self.input,
526 self.disposition,
527 ProcessProvenance::new(self.originator),
528 )
529 .with_event_types(self.event_types)
530 .with_execution_env_ref(env_ref)
531 .with_wake_target(self.wake_target)
532 }
533}
534
535#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
536pub struct SessionScope {
537 pub session_id: String,
538 #[serde(default, skip_serializing_if = "Option::is_none")]
539 pub agent_frame_id: Option<crate::AgentFrameId>,
540}
541
542#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
543pub struct ProcessProvenance {
544 pub originator: ProcessOriginator,
545 #[serde(default, skip_serializing_if = "Option::is_none")]
546 pub caused_by: Option<crate::CausalRef>,
547}
548
549impl ProcessProvenance {
550 pub fn new(originator: ProcessOriginator) -> Self {
551 Self {
552 originator,
553 caused_by: None,
554 }
555 }
556
557 pub fn host() -> Self {
558 Self::new(ProcessOriginator::host())
559 }
560
561 pub fn session(scope: SessionScope) -> Self {
562 Self::new(ProcessOriginator::session(scope))
563 }
564
565 pub fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
566 self.caused_by = caused_by;
567 self
568 }
569}
570
571#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
572#[serde(tag = "type", rename_all = "snake_case")]
573pub enum ProcessOriginator {
574 Host,
575 Session { scope: SessionScope },
576}
577
578impl ProcessOriginator {
579 pub fn host() -> Self {
580 Self::Host
581 }
582
583 pub fn session(scope: SessionScope) -> Self {
584 Self::Session { scope }
585 }
586
587 pub fn scope_id(&self) -> String {
588 match self {
589 Self::Host => "host".to_string(),
590 Self::Session { scope } => scope.id().to_string(),
591 }
592 }
593}
594
595impl SessionScope {
596 pub fn new(session_id: impl Into<String>) -> Self {
597 Self {
598 session_id: session_id.into(),
599 agent_frame_id: None,
600 }
601 }
602
603 pub fn for_agent_frame(
604 session_id: impl Into<String>,
605 agent_frame_id: impl Into<crate::AgentFrameId>,
606 ) -> Self {
607 Self {
608 session_id: session_id.into(),
609 agent_frame_id: Some(agent_frame_id.into()),
610 }
611 }
612
613 pub fn id(&self) -> SessionScopeId {
614 match self.agent_frame_id.as_deref() {
615 Some(frame_id) if !frame_id.is_empty() => {
616 SessionScopeId::new(format!("session:{}/frame:{frame_id}", self.session_id))
617 }
618 _ => SessionScopeId::new(format!("session:{}", self.session_id)),
619 }
620 }
621
622 pub fn is_empty(&self) -> bool {
623 self.session_id.is_empty()
624 }
625}
626
627#[derive(Debug, Serialize, Deserialize)]
629pub struct ProcessRegistration {
630 pub id: ProcessId,
631 pub input: Arc<ProcessInput>,
632 pub disposition: RecoveryDisposition,
633 pub identity: ProcessIdentity,
634 #[serde(default)]
635 pub event_types: Vec<ProcessEventType>,
636 pub provenance: ProcessProvenance,
637 #[serde(default, skip_serializing_if = "Option::is_none")]
638 pub env_ref: Option<ProcessExecutionEnvRef>,
639 #[serde(default, skip_serializing_if = "Option::is_none")]
640 pub wake_target: Option<SessionScope>,
641}
642
643impl Clone for ProcessRegistration {
644 fn clone(&self) -> Self {
645 Self {
646 id: self.id.clone(),
647 input: Arc::clone(&self.input),
648 disposition: self.disposition,
649 identity: self.identity.clone(),
650 event_types: self.event_types.clone(),
651 provenance: self.provenance.clone(),
652 env_ref: self.env_ref.clone(),
653 wake_target: self.wake_target.clone(),
654 }
655 }
656}
657
658impl ProcessRegistration {
659 pub fn new(
660 id: impl Into<ProcessId>,
661 input: ProcessInput,
662 disposition: RecoveryDisposition,
663 provenance: ProcessProvenance,
664 ) -> Self {
665 let identity = ProcessIdentity::from_process_input(&input);
666 Self {
667 id: id.into(),
668 input: Arc::new(input),
669 disposition,
670 identity,
671 event_types: default_process_event_types(),
672 provenance,
673 env_ref: None,
674 wake_target: None,
675 }
676 }
677
678 pub(crate) fn session_start_draft(
679 id: impl Into<ProcessId>,
680 input: ProcessInput,
681 disposition: RecoveryDisposition,
682 ) -> Self {
683 Self::new(id, input, disposition, ProcessProvenance::host())
684 }
685
686 pub fn with_process_provenance(mut self, provenance: ProcessProvenance) -> Self {
687 self.provenance = provenance;
688 self
689 }
690
691 pub fn with_execution_env_ref(mut self, env_ref: Option<ProcessExecutionEnvRef>) -> Self {
692 self.env_ref = env_ref;
693 self
694 }
695
696 pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
697 self.wake_target = wake_target;
698 self
699 }
700
701 pub fn with_identity(mut self, identity: ProcessIdentity) -> Self {
702 self.identity = identity;
703 self
704 }
705
706 pub fn with_event_types(
707 mut self,
708 event_types: impl IntoIterator<Item = ProcessEventType>,
709 ) -> Self {
710 self.event_types = event_types.into_iter().collect();
711 self
712 }
713
714 pub fn with_extra_event_types(
715 mut self,
716 event_types: impl IntoIterator<Item = ProcessEventType>,
717 ) -> Self {
718 self.event_types.extend(event_types);
719 self
720 }
721}
722
723#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
724#[serde(tag = "state", rename_all = "snake_case")]
725pub enum ProcessStatus {
726 #[default]
727 Running,
728 Completed {
729 await_output: ProcessAwaitOutput,
730 },
731 Failed {
732 await_output: ProcessAwaitOutput,
733 },
734 Cancelled {
735 await_output: ProcessAwaitOutput,
736 },
737 Abandoned {
738 await_output: ProcessAwaitOutput,
739 },
740}
741
742impl ProcessStatus {
743 pub fn from_terminal(terminal: ProcessTerminalSemantics) -> Self {
744 match terminal.state {
745 ProcessTerminalState::Completed => Self::Completed {
746 await_output: terminal.await_output,
747 },
748 ProcessTerminalState::Failed => Self::Failed {
749 await_output: terminal.await_output,
750 },
751 ProcessTerminalState::Cancelled => Self::Cancelled {
752 await_output: terminal.await_output,
753 },
754 ProcessTerminalState::Abandoned => Self::Abandoned {
755 await_output: terminal.await_output,
756 },
757 }
758 }
759
760 pub fn is_terminal(&self) -> bool {
761 !matches!(self, Self::Running)
762 }
763
764 pub fn label(&self) -> &'static str {
765 match self {
766 Self::Running => "running",
767 Self::Completed { .. } => "completed",
768 Self::Failed { .. } => "failed",
769 Self::Cancelled { .. } => "cancelled",
770 Self::Abandoned { .. } => "abandoned",
771 }
772 }
773
774 pub fn terminal_state(&self) -> Option<ProcessTerminalState> {
775 match self {
776 Self::Running => None,
777 Self::Completed { .. } => Some(ProcessTerminalState::Completed),
778 Self::Failed { .. } => Some(ProcessTerminalState::Failed),
779 Self::Cancelled { .. } => Some(ProcessTerminalState::Cancelled),
780 Self::Abandoned { .. } => Some(ProcessTerminalState::Abandoned),
781 }
782 }
783
784 pub fn await_output(&self) -> Option<&ProcessAwaitOutput> {
785 match self {
786 Self::Running => None,
787 Self::Completed { await_output }
788 | Self::Failed { await_output }
789 | Self::Cancelled { await_output }
790 | Self::Abandoned { await_output } => Some(await_output),
791 }
792 }
793
794 pub fn terminal_semantics(&self) -> Option<ProcessTerminalSemantics> {
795 Some(ProcessTerminalSemantics {
796 state: self.terminal_state()?,
797 await_output: self.await_output()?.clone(),
798 })
799 }
800}
801
802#[derive(Clone, Debug, Serialize, Deserialize)]
805pub struct ProcessRecord {
806 pub id: ProcessId,
807 pub registration_hash: String,
808 pub input: Arc<ProcessInput>,
809 pub disposition: RecoveryDisposition,
813 pub identity: ProcessIdentity,
814 #[serde(default)]
815 pub event_types: Vec<ProcessEventType>,
816 pub provenance: ProcessProvenance,
817 #[serde(default, skip_serializing_if = "Option::is_none")]
818 pub env_ref: Option<ProcessExecutionEnvRef>,
819 #[serde(default, skip_serializing_if = "Option::is_none")]
820 pub wake_target: Option<SessionScope>,
821 #[serde(default)]
822 pub created_at_ms: u64,
823 #[serde(default)]
824 pub updated_at_ms: u64,
825 #[serde(default, skip_serializing_if = "Option::is_none")]
826 pub external_ref: Option<ProcessExternalRef>,
827 #[serde(default, skip_serializing_if = "Option::is_none")]
833 pub first_started: Option<Box<ProcessStarted>>,
834 #[serde(default, skip_serializing_if = "Option::is_none")]
836 pub abandon_request: Option<Box<AbandonRequest>>,
837 #[serde(default, skip_serializing_if = "Option::is_none")]
838 pub wait: Option<WaitState>,
839 #[serde(default)]
840 pub status: ProcessStatus,
841}
842
843#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
844pub struct WaitState {
845 pub kind: WaitKind,
846 pub since_ms: u64,
847}
848
849#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
850#[serde(tag = "kind", rename_all = "snake_case")]
851pub enum WaitKind {
852 Signal {
853 name: String,
854 event_type: String,
855 key: String,
856 ordinal: u64,
857 },
858}
859
860impl ProcessRecord {
861 pub fn from_registration(registration: ProcessRegistration) -> Self {
862 Self::from_registration_with_clock(registration, &crate::SystemClock)
863 }
864
865 pub fn from_registration_with_clock(
866 mut registration: ProcessRegistration,
867 clock: &dyn crate::Clock,
868 ) -> Self {
869 ensure_core_event_types(&mut registration);
870 validate_process_registration(®istration)
871 .expect("process registration should be valid before record construction");
872 let registration_hash = process_registration_hash(®istration)
873 .expect("process registration should hash before record construction");
874 Self::from_prepared_registration(registration, registration_hash, clock.timestamp_ms())
875 }
876
877 pub fn from_prepared_registration(
878 registration: ProcessRegistration,
879 registration_hash: String,
880 now_ms: u64,
881 ) -> Self {
882 Self {
883 id: registration.id,
884 registration_hash,
885 input: registration.input,
886 disposition: registration.disposition,
887 identity: registration.identity,
888 event_types: registration.event_types,
889 provenance: registration.provenance,
890 env_ref: registration.env_ref,
891 wake_target: registration.wake_target,
892 created_at_ms: now_ms,
893 updated_at_ms: now_ms,
894 external_ref: None,
895 first_started: None,
896 abandon_request: None,
897 wait: None,
898 status: ProcessStatus::Running,
899 }
900 }
901
902 pub fn is_terminal(&self) -> bool {
903 self.status.is_terminal()
904 }
905
906 pub fn clear_wake_target_for_session(&mut self, session_id: &str) -> bool {
907 self.clear_wake_target_for_session_with_clock(session_id, &crate::SystemClock)
908 }
909
910 pub fn clear_wake_target_for_session_with_clock(
911 &mut self,
912 session_id: &str,
913 clock: &dyn crate::Clock,
914 ) -> bool {
915 let should_clear = self
916 .wake_target
917 .as_ref()
918 .is_some_and(|scope| scope.session_id == session_id);
919 if should_clear {
920 self.wake_target = None;
921 self.updated_at_ms = clock.timestamp_ms();
922 }
923 should_clear
924 }
925
926 pub fn originator_scope_id(&self) -> String {
927 self.provenance.originator.scope_id()
928 }
929}
930
931#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
938pub struct ProcessIdentity {
939 pub kind: String,
940 #[serde(default, skip_serializing_if = "Option::is_none")]
941 pub label: Option<String>,
942 #[serde(default, skip_serializing_if = "Option::is_none")]
943 pub definition: Option<serde_json::Value>,
944}
945
946impl ProcessIdentity {
947 pub fn new(kind: impl Into<String>) -> Self {
948 Self {
949 kind: kind.into(),
950 label: None,
951 definition: None,
952 }
953 }
954
955 pub fn with_label(mut self, label: Option<impl Into<String>>) -> Self {
956 self.label = label.map(Into::into);
957 self
958 }
959
960 pub fn with_definition(mut self, definition: Option<serde_json::Value>) -> Self {
961 self.definition = definition;
962 self
963 }
964
965 pub fn from_process_input(input: &ProcessInput) -> Self {
966 match input {
967 ProcessInput::ToolCall { call } => {
968 Self::new("tool").with_label(Some(call.tool_name.clone()))
969 }
970 ProcessInput::Engine { kind, .. } => Self::new(kind.clone()),
971 ProcessInput::SessionTurn { create_request, .. } => {
972 let label = create_request
973 .subagent
974 .as_ref()
975 .map(|subagent| subagent.capability.clone())
976 .or_else(|| create_request.usage_source.clone())
977 .or_else(|| create_request.session_id.clone());
978 Self::new("session_turn").with_label(label)
979 }
980 ProcessInput::External { metadata } => {
981 let label = metadata
982 .get("label")
983 .or_else(|| metadata.get("name"))
984 .or_else(|| metadata.get("title"))
985 .and_then(serde_json::Value::as_str)
986 .map(str::to_string);
987 Self::new("external").with_label(label)
988 }
989 }
990 }
991}
992
993pub const PROCESS_LEASE_SCHEMA_VERSION: u32 = 2;
1000
1001#[derive(Clone, Debug, Serialize, Deserialize)]
1021pub struct ProcessLease {
1022 pub schema_version: u32,
1023 pub process_id: ProcessId,
1024 pub owner: crate::LeaseOwnerIdentity,
1025 pub lease_token: String,
1026 pub fencing_token: u64,
1027 pub claimed_at_epoch_ms: u64,
1028 pub expires_at_epoch_ms: u64,
1029}
1030
1031#[derive(Clone, Debug, Serialize, Deserialize)]
1037pub enum ProcessLeaseClaimOutcome {
1038 Acquired(ProcessLease),
1039 Busy { holder: ProcessLease },
1040}
1041
1042impl ProcessLeaseClaimOutcome {
1043 pub fn acquired(self) -> Option<ProcessLease> {
1044 match self {
1045 Self::Acquired(lease) => Some(lease),
1046 Self::Busy { .. } => None,
1047 }
1048 }
1049}
1050
1051#[derive(Clone, Debug, Serialize, Deserialize)]
1052pub struct ProcessLeaseCompletion {
1053 pub process_id: ProcessId,
1054 pub lease_token: String,
1055}
1056
1057impl ProcessLeaseCompletion {
1058 pub fn from_lease(lease: &ProcessLease) -> Self {
1059 Self {
1060 process_id: lease.process_id.clone(),
1061 lease_token: lease.lease_token.clone(),
1062 }
1063 }
1064}
1065
1066#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
1068pub struct ProcessExternalRef {
1069 pub backend: String,
1070 pub id: String,
1071 #[serde(default, skip_serializing_if = "Option::is_none")]
1072 pub metadata: Option<serde_json::Value>,
1073}
1074
1075#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1076pub struct ProcessHandleDescriptor {
1077 #[serde(default, skip_serializing_if = "Option::is_none")]
1078 pub kind: Option<String>,
1079 #[serde(default, skip_serializing_if = "Option::is_none")]
1080 pub label: Option<String>,
1081}
1082
1083impl ProcessHandleDescriptor {
1084 pub fn new(kind: Option<impl Into<String>>, label: Option<impl Into<String>>) -> Self {
1085 Self {
1086 kind: kind.map(Into::into),
1087 label: label.map(Into::into),
1088 }
1089 }
1090}
1091
1092#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1093pub struct ProcessHandleGrant {
1094 pub session_id: String,
1095 pub process_id: ProcessId,
1096 pub descriptor: ProcessHandleDescriptor,
1097}
1098
1099pub type ProcessHandleGrantEntry = (ProcessHandleGrant, ProcessRecord);
1100
1101#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1102#[serde(rename_all = "snake_case")]
1103pub enum ProcessLifecycleStatus {
1104 #[default]
1105 Running,
1106 Completed,
1107 Failed,
1108 Cancelled,
1109 Abandoned,
1110}
1111
1112impl ProcessLifecycleStatus {
1113 pub fn label(self) -> &'static str {
1114 match self {
1115 Self::Running => "running",
1116 Self::Completed => "completed",
1117 Self::Failed => "failed",
1118 Self::Cancelled => "cancelled",
1119 Self::Abandoned => "abandoned",
1120 }
1121 }
1122
1123 pub fn is_terminal(self) -> bool {
1124 !matches!(self, Self::Running)
1125 }
1126
1127 pub fn terminal_state(self) -> Option<ProcessTerminalState> {
1128 match self {
1129 Self::Running => None,
1130 Self::Completed => Some(ProcessTerminalState::Completed),
1131 Self::Failed => Some(ProcessTerminalState::Failed),
1132 Self::Cancelled => Some(ProcessTerminalState::Cancelled),
1133 Self::Abandoned => Some(ProcessTerminalState::Abandoned),
1134 }
1135 }
1136}
1137
1138impl From<&ProcessStatus> for ProcessLifecycleStatus {
1139 fn from(status: &ProcessStatus) -> Self {
1140 match status {
1141 ProcessStatus::Running => Self::Running,
1142 ProcessStatus::Completed { .. } => Self::Completed,
1143 ProcessStatus::Failed { .. } => Self::Failed,
1144 ProcessStatus::Cancelled { .. } => Self::Cancelled,
1145 ProcessStatus::Abandoned { .. } => Self::Abandoned,
1146 }
1147 }
1148}
1149
1150impl From<ProcessStatus> for ProcessLifecycleStatus {
1151 fn from(status: ProcessStatus) -> Self {
1152 Self::from(&status)
1153 }
1154}
1155
1156#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1157pub struct ProcessHandleSummary {
1158 #[serde(rename = "__handle__")]
1159 pub handle_type: String,
1160 pub id: ProcessId,
1161 pub process_id: ProcessId,
1162 pub descriptor: ProcessHandleDescriptor,
1163 #[serde(default, skip_serializing_if = "Option::is_none")]
1164 pub definition: Option<serde_json::Value>,
1165 pub status: ProcessLifecycleStatus,
1166}
1167
1168impl ProcessHandleSummary {
1169 pub fn new(
1170 process_id: impl Into<ProcessId>,
1171 descriptor: ProcessHandleDescriptor,
1172 status: ProcessLifecycleStatus,
1173 ) -> Self {
1174 let process_id = process_id.into();
1175 Self {
1176 handle_type: "process".to_string(),
1177 id: process_id.clone(),
1178 process_id,
1179 descriptor,
1180 definition: None,
1181 status,
1182 }
1183 }
1184
1185 pub fn with_definition(mut self, definition: Option<serde_json::Value>) -> Self {
1186 self.definition = definition;
1187 self
1188 }
1189
1190 pub fn from_grant_record(grant: ProcessHandleGrant, record: ProcessRecord) -> Self {
1191 let definition = record.identity.definition.clone();
1192 Self::new(
1193 record.id,
1194 grant.descriptor,
1195 ProcessLifecycleStatus::from(record.status),
1196 )
1197 .with_definition(definition)
1198 }
1199}
1200
1201impl From<ProcessHandleGrantEntry> for ProcessHandleSummary {
1202 fn from((grant, record): ProcessHandleGrantEntry) -> Self {
1203 Self::from_grant_record(grant, record)
1204 }
1205}
1206
1207#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1208pub struct ProcessCancelSummary {
1209 pub process_id: ProcessId,
1210 pub status: ProcessLifecycleStatus,
1211}
1212
1213impl ProcessCancelSummary {
1214 pub fn from_record(record: ProcessRecord) -> Self {
1215 Self {
1216 process_id: record.id,
1217 status: ProcessLifecycleStatus::from(record.status),
1218 }
1219 }
1220}
1221
1222#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
1223pub enum ProcessStatusFilter {
1224 #[default]
1225 Running,
1226 Completed,
1227 Failed,
1228 Cancelled,
1229 Abandoned,
1230 Any,
1231}
1232
1233impl ProcessStatusFilter {
1234 pub fn decode(value: Option<&str>) -> Result<Self, String> {
1235 match value.unwrap_or("running") {
1236 "running" => Ok(Self::Running),
1237 "completed" => Ok(Self::Completed),
1238 "failed" => Ok(Self::Failed),
1239 "cancelled" => Ok(Self::Cancelled),
1240 "abandoned" => Ok(Self::Abandoned),
1241 "any" => Ok(Self::Any),
1242 other => Err(format!(
1243 "processes.list status must be `running`, `completed`, `failed`, `cancelled`, `abandoned`, or `any`, got `{other}`"
1244 )),
1245 }
1246 }
1247
1248 pub fn list_mode(self) -> ProcessListMode {
1249 match self {
1250 Self::Running => ProcessListMode::Live,
1251 Self::Completed | Self::Failed | Self::Cancelled | Self::Abandoned | Self::Any => {
1252 ProcessListMode::All
1253 }
1254 }
1255 }
1256
1257 pub fn matches(self, status: ProcessLifecycleStatus) -> bool {
1258 match self {
1259 Self::Running => status == ProcessLifecycleStatus::Running,
1260 Self::Completed => status == ProcessLifecycleStatus::Completed,
1261 Self::Failed => status == ProcessLifecycleStatus::Failed,
1262 Self::Cancelled => status == ProcessLifecycleStatus::Cancelled,
1263 Self::Abandoned => status == ProcessLifecycleStatus::Abandoned,
1264 Self::Any => true,
1265 }
1266 }
1267}
1268
1269#[derive(Clone, Debug, Default, PartialEq)]
1270pub struct ProcessListFilter {
1271 pub definition: Option<serde_json::Value>,
1272 pub status: ProcessStatusFilter,
1273 pub waiting: Option<bool>,
1274}
1275
1276impl ProcessListFilter {
1277 pub fn decode(args: &serde_json::Value) -> Result<Self, String> {
1278 let map = args
1279 .as_object()
1280 .ok_or_else(|| "processes.list expects a record of process filters".to_string())?;
1281 for key in map.keys() {
1282 match key.as_str() {
1283 "definition" | "status" | "waiting" => {}
1284 _ => return Err(format!("processes.list unknown filter `{key}`")),
1285 }
1286 }
1287 let definition = args.get("definition").cloned();
1288 let status =
1289 ProcessStatusFilter::decode(args.get("status").and_then(serde_json::Value::as_str))?;
1290 let waiting = args
1291 .get("waiting")
1292 .map(|value| {
1293 value
1294 .as_bool()
1295 .ok_or_else(|| "processes.list `waiting` filter must be a boolean".to_string())
1296 })
1297 .transpose()?;
1298 Ok(Self {
1299 definition,
1300 status,
1301 waiting,
1302 })
1303 }
1304
1305 pub fn list_mode(&self) -> ProcessListMode {
1306 self.status.list_mode()
1307 }
1308
1309 pub fn matches_entry(&self, entry: &ProcessHandleGrantEntry) -> bool {
1310 let (_grant, record) = entry;
1311 self.matches_record(record)
1312 }
1313
1314 pub fn matches_record(&self, record: &ProcessRecord) -> bool {
1315 let status = ProcessLifecycleStatus::from(&record.status);
1316 self.status.matches(status)
1317 && self
1318 .definition
1319 .as_ref()
1320 .is_none_or(|definition| record.identity.definition.as_ref() == Some(definition))
1321 && self
1322 .waiting
1323 .is_none_or(|waiting| record.wait.is_some() == waiting)
1324 }
1325}
1326
1327#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1328#[serde(rename_all = "snake_case")]
1329pub enum ProcessListMode {
1330 #[default]
1331 Live,
1332 All,
1333}
1334
1335impl ProcessListMode {
1336 pub fn as_str(self) -> &'static str {
1337 match self {
1338 Self::Live => "live",
1339 Self::All => "all",
1340 }
1341 }
1342}
1343
1344#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1345pub struct ProcessStartGrant {
1346 pub session_scope: SessionScope,
1347 pub descriptor: ProcessHandleDescriptor,
1348}
1349
1350#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1351pub struct ProcessSessionDeleteReport {
1352 pub session_id: String,
1353 pub revoked_handle_count: usize,
1354 pub deleted_wake_count: usize,
1355 pub orphaned_process_ids: Vec<String>,
1356 pub preserved_process_ids: Vec<String>,
1357}
1358
1359#[cfg(test)]
1360mod tests {
1361 use serde_json::json;
1362
1363 use super::*;
1364
1365 fn process_value(component: &str, pos: usize, name: &str) -> serde_json::Value {
1366 json!({
1367 "component": component,
1368 "pos": pos,
1369 "name": name,
1370 })
1371 }
1372
1373 fn engine_entry(
1374 process_id: &str,
1375 definition: serde_json::Value,
1376 process_name: &str,
1377 status: ProcessStatus,
1378 ) -> ProcessHandleGrantEntry {
1379 let mut record = ProcessRecord::from_registration(
1380 ProcessRegistration::new(
1381 process_id,
1382 ProcessInput::Engine {
1383 kind: "test-engine".to_string(),
1384 payload: json!({
1385 "definition": definition.clone(),
1386 "label": process_name,
1387 }),
1388 },
1389 RecoveryDisposition::Rerunnable,
1390 ProcessProvenance::host(),
1391 )
1392 .with_identity(
1393 ProcessIdentity::new("test-engine")
1394 .with_label(Some(process_name))
1395 .with_definition(Some(definition)),
1396 )
1397 .with_execution_env_ref(Some(ProcessExecutionEnvRef::new(format!(
1398 "process-env:test:{process_id}"
1399 )))),
1400 );
1401 record.status = status;
1402 (
1403 ProcessHandleGrant {
1404 session_id: "session".to_string(),
1405 process_id: process_id.to_string(),
1406 descriptor: ProcessHandleDescriptor::new(Some("test-engine"), Some(process_name)),
1407 },
1408 record,
1409 )
1410 }
1411
1412 #[test]
1413 fn process_list_filter_matches_definition_and_status() {
1414 let target_ref = process_value("target", 0, "target");
1415 let other_ref = process_value("other", 1, "other");
1416 let filter = ProcessListFilter::decode(&json!({
1417 "definition": target_ref,
1418 "status": "completed"
1419 }))
1420 .expect("decode filter");
1421
1422 let matching = engine_entry(
1423 "matching",
1424 target_ref,
1425 "target",
1426 ProcessStatus::Completed {
1427 await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1428 json!(true),
1429 )),
1430 },
1431 );
1432 let wrong_definition = engine_entry(
1433 "wrong-definition",
1434 other_ref,
1435 "other",
1436 ProcessStatus::Completed {
1437 await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1438 json!(true),
1439 )),
1440 },
1441 );
1442
1443 assert_eq!(filter.list_mode(), ProcessListMode::All);
1444 assert!(filter.matches_entry(&matching));
1445 assert!(!filter.matches_entry(&wrong_definition));
1446 }
1447
1448 #[test]
1449 fn process_list_filter_matches_waiting_facet() {
1450 let process_ref = process_value("target", 0, "target");
1451 let mut waiting_entry = engine_entry(
1452 "waiting",
1453 process_ref.clone(),
1454 "target",
1455 ProcessStatus::Running,
1456 );
1457 waiting_entry.1.wait = Some(WaitState {
1458 since_ms: 42,
1459 kind: WaitKind::Signal {
1460 name: "ready".to_string(),
1461 event_type: "signal.ready".to_string(),
1462 key: "process:waiting:signal.ready:1".to_string(),
1463 ordinal: 1,
1464 },
1465 });
1466 let idle_entry = engine_entry("idle", process_ref, "target", ProcessStatus::Running);
1467 let waiting_filter =
1468 ProcessListFilter::decode(&json!({ "waiting": true })).expect("decode waiting filter");
1469 let idle_filter =
1470 ProcessListFilter::decode(&json!({ "waiting": false })).expect("decode idle filter");
1471
1472 assert_eq!(waiting_filter.list_mode(), ProcessListMode::Live);
1473 assert!(waiting_filter.matches_entry(&waiting_entry));
1474 assert!(!waiting_filter.matches_entry(&idle_entry));
1475 assert!(!idle_filter.matches_entry(&waiting_entry));
1476 assert!(idle_filter.matches_entry(&idle_entry));
1477 assert!(
1478 ProcessListFilter::decode(&json!({ "waiting": "yes" }))
1479 .expect_err("invalid waiting filter")
1480 .contains("must be a boolean")
1481 );
1482 }
1483}