1use std::collections::BTreeMap;
2use std::fmt;
3use std::sync::Arc;
4use std::sync::Mutex;
5
6use serde::{Deserialize, Serialize};
7
8use super::events::{
9 ProcessAwaitOutput, ProcessEventType, ProcessTerminalSemantics, ProcessTerminalState,
10 default_process_event_types,
11};
12use super::time::current_epoch_ms;
13use super::validation::{
14 ensure_core_event_types, process_registration_hash, validate_process_registration,
15};
16
17pub type ProcessId = String;
18
19#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
20#[serde(transparent)]
21pub struct SessionScopeId(String);
22
23impl SessionScopeId {
24 pub fn new(value: impl Into<String>) -> Self {
25 Self(value.into())
26 }
27
28 pub fn as_str(&self) -> &str {
29 &self.0
30 }
31}
32
33impl fmt::Display for SessionScopeId {
34 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
35 formatter.write_str(&self.0)
36 }
37}
38
39impl From<String> for SessionScopeId {
40 fn from(value: String) -> Self {
41 Self::new(value)
42 }
43}
44
45impl From<&str> for SessionScopeId {
46 fn from(value: &str) -> Self {
47 Self::new(value)
48 }
49}
50
51#[derive(Debug, Serialize, Deserialize)]
60#[serde(tag = "type", rename_all = "snake_case")]
61pub enum ProcessInput {
62 ToolCall {
63 call: crate::PreparedToolCall,
64 },
65 Engine {
66 kind: String,
67 #[serde(default)]
68 payload: serde_json::Value,
69 },
70 SessionTurn {
71 create_request: Box<crate::SessionCreateRequest>,
72 turn_input: Box<crate::TurnInput>,
73 output_contract: crate::ToolOutputContract,
74 },
75 External {
76 #[serde(default)]
77 metadata: serde_json::Value,
78 },
79}
80
81impl Clone for ProcessInput {
82 fn clone(&self) -> Self {
83 match self {
84 Self::ToolCall { call } => Self::ToolCall { call: call.clone() },
85 Self::Engine { kind, payload } => Self::Engine {
86 kind: kind.clone(),
87 payload: payload.clone(),
88 },
89 Self::SessionTurn {
90 create_request,
91 turn_input,
92 output_contract,
93 } => Self::SessionTurn {
94 create_request: create_request.clone(),
95 turn_input: turn_input.clone(),
96 output_contract: output_contract.clone(),
97 },
98 Self::External { metadata } => Self::External {
99 metadata: metadata.clone(),
100 },
101 }
102 }
103}
104
105impl PartialEq for ProcessInput {
106 fn eq(&self, other: &Self) -> bool {
107 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
108 }
109}
110
111impl ProcessInput {
112 pub fn engine_kind(&self) -> &'static str {
113 match self {
114 Self::ToolCall { .. } => "tool",
115 Self::Engine { .. } => "engine",
116 Self::SessionTurn { .. } => "session_turn",
117 Self::External { .. } => "external",
118 }
119 }
120
121 pub fn engine_specific_kind(&self) -> Option<&str> {
122 match self {
123 Self::Engine { kind, .. } => Some(kind.as_str()),
124 _ => None,
125 }
126 }
127}
128
129#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
130#[serde(transparent)]
131pub struct ProcessExecutionEnvRef(String);
132
133impl ProcessExecutionEnvRef {
134 pub fn new(value: impl Into<String>) -> Self {
135 Self(value.into())
136 }
137
138 pub fn as_str(&self) -> &str {
139 &self.0
140 }
141}
142
143impl fmt::Display for ProcessExecutionEnvRef {
144 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
145 formatter.write_str(&self.0)
146 }
147}
148
149#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
150#[serde(deny_unknown_fields)]
151pub struct ProcessExecutionEnvSpec {
152 #[serde(default)]
153 pub plugin_options: crate::PluginOptions,
154 #[serde(default)]
155 pub policy: crate::SessionPolicy,
156}
157
158impl ProcessExecutionEnvSpec {
159 pub fn new(plugin_options: crate::PluginOptions, policy: crate::SessionPolicy) -> Self {
160 Self {
161 plugin_options,
162 policy,
163 }
164 }
165
166 pub fn stable_ref(&self) -> Result<ProcessExecutionEnvRef, serde_json::Error> {
167 crate::stable_hash::stable_json_sha256_hex(self)
168 .map(|hash| ProcessExecutionEnvRef::new(format!("process-env:sha256:{hash}")))
169 }
170
171 pub fn to_store_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
172 serde_json::to_vec(self)
173 }
174
175 pub fn from_store_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
176 serde_json::from_slice(bytes)
177 }
178}
179
180#[async_trait::async_trait]
181pub trait ProcessExecutionEnvStore: Send + Sync {
182 fn durability_tier(&self) -> crate::DurabilityTier {
183 crate::DurabilityTier::Inline
184 }
185
186 async fn put_process_execution_env(
187 &self,
188 env_ref: &ProcessExecutionEnvRef,
189 bytes: &[u8],
190 ) -> Result<(), crate::PluginError>;
191
192 async fn get_process_execution_env(
193 &self,
194 env_ref: &ProcessExecutionEnvRef,
195 ) -> Result<Option<Vec<u8>>, crate::PluginError>;
196}
197
198#[derive(Default)]
199pub struct InMemoryProcessExecutionEnvStore {
200 envs: Mutex<BTreeMap<String, Vec<u8>>>,
201}
202
203impl InMemoryProcessExecutionEnvStore {
204 pub fn new() -> Self {
205 Self::default()
206 }
207}
208
209#[async_trait::async_trait]
210impl ProcessExecutionEnvStore for InMemoryProcessExecutionEnvStore {
211 async fn put_process_execution_env(
212 &self,
213 env_ref: &ProcessExecutionEnvRef,
214 bytes: &[u8],
215 ) -> Result<(), crate::PluginError> {
216 self.envs
217 .lock()
218 .map_err(|_| {
219 crate::PluginError::Session("process execution env store lock poisoned".to_string())
220 })?
221 .insert(env_ref.as_str().to_string(), bytes.to_vec());
222 Ok(())
223 }
224
225 async fn get_process_execution_env(
226 &self,
227 env_ref: &ProcessExecutionEnvRef,
228 ) -> Result<Option<Vec<u8>>, crate::PluginError> {
229 Ok(self
230 .envs
231 .lock()
232 .map_err(|_| {
233 crate::PluginError::Session("process execution env store lock poisoned".to_string())
234 })?
235 .get(env_ref.as_str())
236 .cloned())
237 }
238}
239
240pub async fn persist_process_execution_env(
241 env_store: &dyn ProcessExecutionEnvStore,
242 spec: &ProcessExecutionEnvSpec,
243) -> Result<ProcessExecutionEnvRef, crate::PluginError> {
244 let env_ref = spec.stable_ref().map_err(|err| {
245 crate::PluginError::Session(format!("failed to hash process execution env: {err}"))
246 })?;
247 let bytes = spec.to_store_bytes().map_err(|err| {
248 crate::PluginError::Session(format!("failed to encode process execution env: {err}"))
249 })?;
250 env_store
251 .put_process_execution_env(&env_ref, &bytes)
252 .await?;
253 Ok(env_ref)
254}
255
256pub async fn load_process_execution_env(
257 env_store: &dyn ProcessExecutionEnvStore,
258 env_ref: &ProcessExecutionEnvRef,
259) -> Result<ProcessExecutionEnvSpec, crate::PluginError> {
260 let bytes = env_store
261 .get_process_execution_env(env_ref)
262 .await?
263 .ok_or_else(|| {
264 crate::PluginError::Session(format!("missing process execution env `{env_ref}`"))
265 })?;
266 ProcessExecutionEnvSpec::from_store_bytes(&bytes).map_err(|err| {
267 crate::PluginError::Session(format!(
268 "failed to decode process execution env `{env_ref}`: {err}"
269 ))
270 })
271}
272
273#[derive(Clone, Debug, Default, Serialize, Deserialize)]
276pub struct ProcessExecutionContext {
277 #[serde(default, skip_serializing_if = "Option::is_none")]
278 pub causal_invocation: Option<crate::RuntimeInvocation>,
279}
280
281impl ProcessExecutionContext {
282 pub fn with_causal_invocation(mut self, invocation: Option<crate::RuntimeInvocation>) -> Self {
283 self.causal_invocation = invocation;
284 self
285 }
286
287 pub fn is_empty(&self) -> bool {
288 self.causal_invocation.is_none()
289 }
290}
291
292#[derive(Clone)]
293pub struct ProcessOpScope<'scope> {
294 pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
295 pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'scope>,
296 pub(crate) agent_frame_id: Option<crate::AgentFrameId>,
297 pub(crate) target_agent_frame_id: Option<crate::AgentFrameId>,
298}
299
300impl<'scope> ProcessOpScope<'scope> {
301 pub fn new(scoped_effect_controller: crate::ScopedEffectController<'scope>) -> Self {
302 Self {
303 parent_invocation: None,
304 effect_controller: crate::runtime::RuntimeEffectControllerHandle::borrowed(
305 scoped_effect_controller,
306 ),
307 agent_frame_id: None,
308 target_agent_frame_id: None,
309 }
310 }
311
312 pub fn with_parent_invocation(
313 mut self,
314 parent_invocation: Option<crate::RuntimeInvocation>,
315 ) -> Self {
316 self.parent_invocation = parent_invocation;
317 self
318 }
319
320 pub fn with_agent_frame_id(mut self, agent_frame_id: Option<crate::AgentFrameId>) -> Self {
321 self.agent_frame_id = agent_frame_id;
322 self
323 }
324
325 pub fn with_target_agent_frame_id(
326 mut self,
327 agent_frame_id: Option<crate::AgentFrameId>,
328 ) -> Self {
329 self.target_agent_frame_id = agent_frame_id;
330 self
331 }
332
333 pub fn agent_frame_id(&self) -> Option<&str> {
334 self.agent_frame_id.as_deref()
335 }
336
337 pub fn target_agent_frame_id(&self) -> Option<&str> {
338 self.target_agent_frame_id.as_deref()
339 }
340
341 pub(crate) fn controller(&self) -> &dyn crate::RuntimeEffectController {
342 self.effect_controller.controller()
343 }
344}
345
346#[derive(Clone, Debug, Default)]
347pub struct ProcessStartOptions {
348 pub descriptor: Option<ProcessHandleDescriptor>,
349 pub spawn_provenance: Option<ProcessSpawnProvenance>,
357}
358
359#[derive(Clone, Debug, PartialEq, Eq)]
364pub struct ProcessSpawnProvenance {
365 pub originator: ProcessOriginator,
366 pub wake_target: Option<SessionScope>,
367}
368
369impl ProcessStartOptions {
370 pub fn new() -> Self {
371 Self::default()
372 }
373
374 pub fn with_descriptor(mut self, descriptor: ProcessHandleDescriptor) -> Self {
375 self.descriptor = Some(descriptor);
376 self
377 }
378
379 pub fn with_optional_descriptor(mut self, descriptor: Option<ProcessHandleDescriptor>) -> Self {
380 self.descriptor = descriptor;
381 self
382 }
383
384 pub fn with_spawn_provenance(mut self, spawn_provenance: ProcessSpawnProvenance) -> Self {
385 self.spawn_provenance = Some(spawn_provenance);
386 self
387 }
388
389 pub fn execution_context(&self, scope: &ProcessOpScope<'_>) -> ProcessExecutionContext {
390 ProcessExecutionContext {
391 causal_invocation: scope.parent_invocation.clone(),
392 }
393 }
394}
395
396#[derive(Clone, Debug, Serialize, Deserialize)]
398pub struct ProcessStartRequest {
399 pub id: ProcessId,
400 pub input: ProcessInput,
401 #[serde(default, skip_serializing_if = "Option::is_none")]
402 pub env_spec: Option<ProcessExecutionEnvSpec>,
403 pub originator: ProcessOriginator,
404 #[serde(default, skip_serializing_if = "Option::is_none")]
405 pub wake_target: Option<SessionScope>,
406 #[serde(default, skip_serializing_if = "Option::is_none")]
407 pub grant: Option<ProcessStartGrant>,
408 #[serde(default)]
409 pub event_types: Vec<ProcessEventType>,
410}
411
412impl ProcessStartRequest {
413 pub fn new(
414 id: impl Into<ProcessId>,
415 input: ProcessInput,
416 originator: ProcessOriginator,
417 ) -> Self {
418 Self {
419 id: id.into(),
420 input,
421 env_spec: None,
422 originator,
423 wake_target: None,
424 grant: None,
425 event_types: default_process_event_types(),
426 }
427 }
428
429 pub fn external(
430 id: impl Into<ProcessId>,
431 originator: ProcessOriginator,
432 metadata: serde_json::Value,
433 ) -> Self {
434 Self::new(id, ProcessInput::External { metadata }, originator)
435 }
436
437 pub fn with_env_spec(mut self, env_spec: ProcessExecutionEnvSpec) -> Self {
438 self.env_spec = Some(env_spec);
439 self
440 }
441
442 pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
443 self.wake_target = wake_target;
444 self
445 }
446
447 pub fn with_grant(mut self, grant: Option<ProcessStartGrant>) -> Self {
448 self.grant = grant;
449 self
450 }
451
452 pub fn with_event_types(
453 mut self,
454 event_types: impl IntoIterator<Item = ProcessEventType>,
455 ) -> Self {
456 self.event_types = event_types.into_iter().collect();
457 self
458 }
459
460 pub fn with_extra_event_types(
461 mut self,
462 event_types: impl IntoIterator<Item = ProcessEventType>,
463 ) -> Self {
464 self.event_types.extend(event_types);
465 self
466 }
467
468 pub fn into_registration(self, env_ref: Option<ProcessExecutionEnvRef>) -> ProcessRegistration {
469 ProcessRegistration::new(self.id, self.input, ProcessProvenance::new(self.originator))
470 .with_event_types(self.event_types)
471 .with_execution_env_ref(env_ref)
472 .with_wake_target(self.wake_target)
473 }
474}
475
476#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
477pub struct SessionScope {
478 pub session_id: String,
479 #[serde(default, skip_serializing_if = "Option::is_none")]
480 pub agent_frame_id: Option<crate::AgentFrameId>,
481}
482
483#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
484pub struct ProcessProvenance {
485 pub originator: ProcessOriginator,
486 #[serde(default, skip_serializing_if = "Option::is_none")]
487 pub caused_by: Option<crate::CausalRef>,
488}
489
490impl ProcessProvenance {
491 pub fn new(originator: ProcessOriginator) -> Self {
492 Self {
493 originator,
494 caused_by: None,
495 }
496 }
497
498 pub fn host() -> Self {
499 Self::new(ProcessOriginator::host())
500 }
501
502 pub fn session(scope: SessionScope) -> Self {
503 Self::new(ProcessOriginator::session(scope))
504 }
505
506 pub fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
507 self.caused_by = caused_by;
508 self
509 }
510}
511
512#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
513#[serde(tag = "type", rename_all = "snake_case")]
514pub enum ProcessOriginator {
515 Host,
516 Session { scope: SessionScope },
517}
518
519impl ProcessOriginator {
520 pub fn host() -> Self {
521 Self::Host
522 }
523
524 pub fn session(scope: SessionScope) -> Self {
525 Self::Session { scope }
526 }
527
528 pub fn scope_id(&self) -> String {
529 match self {
530 Self::Host => "host".to_string(),
531 Self::Session { scope } => scope.id().to_string(),
532 }
533 }
534}
535
536impl SessionScope {
537 pub fn new(session_id: impl Into<String>) -> Self {
538 Self {
539 session_id: session_id.into(),
540 agent_frame_id: None,
541 }
542 }
543
544 pub fn for_agent_frame(
545 session_id: impl Into<String>,
546 agent_frame_id: impl Into<crate::AgentFrameId>,
547 ) -> Self {
548 Self {
549 session_id: session_id.into(),
550 agent_frame_id: Some(agent_frame_id.into()),
551 }
552 }
553
554 pub fn id(&self) -> SessionScopeId {
555 match self.agent_frame_id.as_deref() {
556 Some(frame_id) if !frame_id.is_empty() => {
557 SessionScopeId::new(format!("session:{}/frame:{frame_id}", self.session_id))
558 }
559 _ => SessionScopeId::new(format!("session:{}", self.session_id)),
560 }
561 }
562
563 pub fn is_empty(&self) -> bool {
564 self.session_id.is_empty()
565 }
566}
567
568#[derive(Debug, Serialize, Deserialize)]
570pub struct ProcessRegistration {
571 pub id: ProcessId,
572 pub input: Arc<ProcessInput>,
573 pub identity: ProcessIdentity,
574 #[serde(default)]
575 pub event_types: Vec<ProcessEventType>,
576 pub provenance: ProcessProvenance,
577 #[serde(default, skip_serializing_if = "Option::is_none")]
578 pub env_ref: Option<ProcessExecutionEnvRef>,
579 #[serde(default, skip_serializing_if = "Option::is_none")]
580 pub wake_target: Option<SessionScope>,
581}
582
583impl Clone for ProcessRegistration {
584 fn clone(&self) -> Self {
585 Self {
586 id: self.id.clone(),
587 input: Arc::clone(&self.input),
588 identity: self.identity.clone(),
589 event_types: self.event_types.clone(),
590 provenance: self.provenance.clone(),
591 env_ref: self.env_ref.clone(),
592 wake_target: self.wake_target.clone(),
593 }
594 }
595}
596
597impl ProcessRegistration {
598 pub fn new(
599 id: impl Into<ProcessId>,
600 input: ProcessInput,
601 provenance: ProcessProvenance,
602 ) -> Self {
603 let identity = ProcessIdentity::from_process_input(&input);
604 Self {
605 id: id.into(),
606 input: Arc::new(input),
607 identity,
608 event_types: default_process_event_types(),
609 provenance,
610 env_ref: None,
611 wake_target: None,
612 }
613 }
614
615 pub(crate) fn session_start_draft(id: impl Into<ProcessId>, input: ProcessInput) -> Self {
616 Self::new(id, input, ProcessProvenance::host())
617 }
618
619 pub fn with_process_provenance(mut self, provenance: ProcessProvenance) -> Self {
620 self.provenance = provenance;
621 self
622 }
623
624 pub fn with_execution_env_ref(mut self, env_ref: Option<ProcessExecutionEnvRef>) -> Self {
625 self.env_ref = env_ref;
626 self
627 }
628
629 pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
630 self.wake_target = wake_target;
631 self
632 }
633
634 pub fn with_identity(mut self, identity: ProcessIdentity) -> Self {
635 self.identity = identity;
636 self
637 }
638
639 pub fn with_event_types(
640 mut self,
641 event_types: impl IntoIterator<Item = ProcessEventType>,
642 ) -> Self {
643 self.event_types = event_types.into_iter().collect();
644 self
645 }
646
647 pub fn with_extra_event_types(
648 mut self,
649 event_types: impl IntoIterator<Item = ProcessEventType>,
650 ) -> Self {
651 self.event_types.extend(event_types);
652 self
653 }
654}
655
656#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
657#[serde(tag = "state", rename_all = "snake_case")]
658pub enum ProcessStatus {
659 #[default]
660 Running,
661 Completed {
662 await_output: ProcessAwaitOutput,
663 },
664 Failed {
665 await_output: ProcessAwaitOutput,
666 },
667 Cancelled {
668 await_output: ProcessAwaitOutput,
669 },
670}
671
672impl ProcessStatus {
673 pub fn from_terminal(terminal: ProcessTerminalSemantics) -> Self {
674 match terminal.state {
675 ProcessTerminalState::Completed => Self::Completed {
676 await_output: terminal.await_output,
677 },
678 ProcessTerminalState::Failed => Self::Failed {
679 await_output: terminal.await_output,
680 },
681 ProcessTerminalState::Cancelled => Self::Cancelled {
682 await_output: terminal.await_output,
683 },
684 }
685 }
686
687 pub fn is_terminal(&self) -> bool {
688 !matches!(self, Self::Running)
689 }
690
691 pub fn label(&self) -> &'static str {
692 match self {
693 Self::Running => "running",
694 Self::Completed { .. } => "completed",
695 Self::Failed { .. } => "failed",
696 Self::Cancelled { .. } => "cancelled",
697 }
698 }
699
700 pub fn terminal_state(&self) -> Option<ProcessTerminalState> {
701 match self {
702 Self::Running => None,
703 Self::Completed { .. } => Some(ProcessTerminalState::Completed),
704 Self::Failed { .. } => Some(ProcessTerminalState::Failed),
705 Self::Cancelled { .. } => Some(ProcessTerminalState::Cancelled),
706 }
707 }
708
709 pub fn await_output(&self) -> Option<&ProcessAwaitOutput> {
710 match self {
711 Self::Running => None,
712 Self::Completed { await_output }
713 | Self::Failed { await_output }
714 | Self::Cancelled { await_output } => Some(await_output),
715 }
716 }
717
718 pub fn terminal_semantics(&self) -> Option<ProcessTerminalSemantics> {
719 Some(ProcessTerminalSemantics {
720 state: self.terminal_state()?,
721 await_output: self.await_output()?.clone(),
722 })
723 }
724}
725
726#[derive(Clone, Debug, Serialize, Deserialize)]
729pub struct ProcessRecord {
730 pub id: ProcessId,
731 pub registration_hash: String,
732 pub input: Arc<ProcessInput>,
733 pub identity: ProcessIdentity,
734 #[serde(default)]
735 pub event_types: Vec<ProcessEventType>,
736 pub provenance: ProcessProvenance,
737 #[serde(default, skip_serializing_if = "Option::is_none")]
738 pub env_ref: Option<ProcessExecutionEnvRef>,
739 #[serde(default, skip_serializing_if = "Option::is_none")]
740 pub wake_target: Option<SessionScope>,
741 #[serde(default)]
742 pub created_at_ms: u64,
743 #[serde(default)]
744 pub updated_at_ms: u64,
745 #[serde(default, skip_serializing_if = "Option::is_none")]
746 pub external_ref: Option<ProcessExternalRef>,
747 #[serde(default, skip_serializing_if = "Option::is_none")]
748 pub wait: Option<WaitState>,
749 #[serde(default)]
750 pub status: ProcessStatus,
751}
752
753#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
754pub struct WaitState {
755 pub kind: WaitKind,
756 pub since_ms: u64,
757}
758
759#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
760#[serde(tag = "kind", rename_all = "snake_case")]
761pub enum WaitKind {
762 Signal {
763 name: String,
764 event_type: String,
765 key: String,
766 ordinal: u64,
767 },
768}
769
770impl ProcessRecord {
771 pub fn from_registration(mut registration: ProcessRegistration) -> Self {
772 ensure_core_event_types(&mut registration);
773 validate_process_registration(®istration)
774 .expect("process registration should be valid before record construction");
775 let registration_hash = process_registration_hash(®istration)
776 .expect("process registration should hash before record construction");
777 Self::from_prepared_registration(registration, registration_hash, current_epoch_ms())
778 }
779
780 pub fn from_prepared_registration(
781 registration: ProcessRegistration,
782 registration_hash: String,
783 now_ms: u64,
784 ) -> Self {
785 Self {
786 id: registration.id,
787 registration_hash,
788 input: registration.input,
789 identity: registration.identity,
790 event_types: registration.event_types,
791 provenance: registration.provenance,
792 env_ref: registration.env_ref,
793 wake_target: registration.wake_target,
794 created_at_ms: now_ms,
795 updated_at_ms: now_ms,
796 external_ref: None,
797 wait: None,
798 status: ProcessStatus::Running,
799 }
800 }
801
802 pub fn is_terminal(&self) -> bool {
803 self.status.is_terminal()
804 }
805
806 pub fn clear_wake_target_for_session(&mut self, session_id: &str) -> bool {
807 let should_clear = self
808 .wake_target
809 .as_ref()
810 .is_some_and(|scope| scope.session_id == session_id);
811 if should_clear {
812 self.wake_target = None;
813 self.updated_at_ms = current_epoch_ms();
814 }
815 should_clear
816 }
817
818 pub fn originator_scope_id(&self) -> String {
819 self.provenance.originator.scope_id()
820 }
821}
822
823#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
830pub struct ProcessIdentity {
831 pub kind: String,
832 #[serde(default, skip_serializing_if = "Option::is_none")]
833 pub label: Option<String>,
834 #[serde(default, skip_serializing_if = "Option::is_none")]
835 pub definition: Option<serde_json::Value>,
836}
837
838impl ProcessIdentity {
839 pub fn new(kind: impl Into<String>) -> Self {
840 Self {
841 kind: kind.into(),
842 label: None,
843 definition: None,
844 }
845 }
846
847 pub fn with_label(mut self, label: Option<impl Into<String>>) -> Self {
848 self.label = label.map(Into::into);
849 self
850 }
851
852 pub fn with_definition(mut self, definition: Option<serde_json::Value>) -> Self {
853 self.definition = definition;
854 self
855 }
856
857 pub fn from_process_input(input: &ProcessInput) -> Self {
858 match input {
859 ProcessInput::ToolCall { call } => {
860 Self::new("tool").with_label(Some(call.tool_name.clone()))
861 }
862 ProcessInput::Engine { kind, .. } => Self::new(kind.clone()),
863 ProcessInput::SessionTurn { create_request, .. } => {
864 let label = create_request
865 .subagent
866 .as_ref()
867 .map(|subagent| subagent.capability.clone())
868 .or_else(|| create_request.usage_source.clone())
869 .or_else(|| create_request.session_id.clone());
870 Self::new("session_turn").with_label(label)
871 }
872 ProcessInput::External { metadata } => {
873 let label = metadata
874 .get("label")
875 .or_else(|| metadata.get("name"))
876 .or_else(|| metadata.get("title"))
877 .and_then(serde_json::Value::as_str)
878 .map(str::to_string);
879 Self::new("external").with_label(label)
880 }
881 }
882 }
883}
884
885pub const PROCESS_LEASE_SCHEMA_VERSION: u32 = 1;
890
891#[derive(Clone, Debug, Serialize, Deserialize)]
905pub struct ProcessLease {
906 pub schema_version: u32,
907 pub process_id: ProcessId,
908 pub owner_id: String,
909 pub lease_token: String,
910 pub fencing_token: u64,
911 pub claimed_at_epoch_ms: u64,
912 pub expires_at_epoch_ms: u64,
913}
914
915#[derive(Clone, Debug, Serialize, Deserialize)]
916pub struct ProcessLeaseCompletion {
917 pub process_id: ProcessId,
918 pub lease_token: String,
919}
920
921impl ProcessLeaseCompletion {
922 pub fn from_lease(lease: &ProcessLease) -> Self {
923 Self {
924 process_id: lease.process_id.clone(),
925 lease_token: lease.lease_token.clone(),
926 }
927 }
928}
929
930#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
932pub struct ProcessExternalRef {
933 pub backend: String,
934 pub id: String,
935 #[serde(default, skip_serializing_if = "Option::is_none")]
936 pub metadata: Option<serde_json::Value>,
937}
938
939#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
940pub struct ProcessHandleDescriptor {
941 #[serde(default, skip_serializing_if = "Option::is_none")]
942 pub kind: Option<String>,
943 #[serde(default, skip_serializing_if = "Option::is_none")]
944 pub label: Option<String>,
945}
946
947impl ProcessHandleDescriptor {
948 pub fn new(kind: Option<impl Into<String>>, label: Option<impl Into<String>>) -> Self {
949 Self {
950 kind: kind.map(Into::into),
951 label: label.map(Into::into),
952 }
953 }
954}
955
956#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
957pub struct ProcessHandleGrant {
958 pub session_id: String,
959 pub process_id: ProcessId,
960 pub descriptor: ProcessHandleDescriptor,
961}
962
963pub type ProcessHandleGrantEntry = (ProcessHandleGrant, ProcessRecord);
964
965#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
966#[serde(rename_all = "snake_case")]
967pub enum ProcessLifecycleStatus {
968 #[default]
969 Running,
970 Completed,
971 Failed,
972 Cancelled,
973}
974
975impl ProcessLifecycleStatus {
976 pub fn label(self) -> &'static str {
977 match self {
978 Self::Running => "running",
979 Self::Completed => "completed",
980 Self::Failed => "failed",
981 Self::Cancelled => "cancelled",
982 }
983 }
984
985 pub fn is_terminal(self) -> bool {
986 !matches!(self, Self::Running)
987 }
988
989 pub fn terminal_state(self) -> Option<ProcessTerminalState> {
990 match self {
991 Self::Running => None,
992 Self::Completed => Some(ProcessTerminalState::Completed),
993 Self::Failed => Some(ProcessTerminalState::Failed),
994 Self::Cancelled => Some(ProcessTerminalState::Cancelled),
995 }
996 }
997}
998
999impl From<&ProcessStatus> for ProcessLifecycleStatus {
1000 fn from(status: &ProcessStatus) -> Self {
1001 match status {
1002 ProcessStatus::Running => Self::Running,
1003 ProcessStatus::Completed { .. } => Self::Completed,
1004 ProcessStatus::Failed { .. } => Self::Failed,
1005 ProcessStatus::Cancelled { .. } => Self::Cancelled,
1006 }
1007 }
1008}
1009
1010impl From<ProcessStatus> for ProcessLifecycleStatus {
1011 fn from(status: ProcessStatus) -> Self {
1012 Self::from(&status)
1013 }
1014}
1015
1016#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1017pub struct ProcessHandleSummary {
1018 #[serde(rename = "__handle__")]
1019 pub handle_type: String,
1020 pub id: ProcessId,
1021 pub process_id: ProcessId,
1022 pub descriptor: ProcessHandleDescriptor,
1023 #[serde(default, skip_serializing_if = "Option::is_none")]
1024 pub definition: Option<serde_json::Value>,
1025 pub status: ProcessLifecycleStatus,
1026}
1027
1028impl ProcessHandleSummary {
1029 pub fn new(
1030 process_id: impl Into<ProcessId>,
1031 descriptor: ProcessHandleDescriptor,
1032 status: ProcessLifecycleStatus,
1033 ) -> Self {
1034 let process_id = process_id.into();
1035 Self {
1036 handle_type: "process".to_string(),
1037 id: process_id.clone(),
1038 process_id,
1039 descriptor,
1040 definition: None,
1041 status,
1042 }
1043 }
1044
1045 pub fn with_definition(mut self, definition: Option<serde_json::Value>) -> Self {
1046 self.definition = definition;
1047 self
1048 }
1049
1050 pub fn from_grant_record(grant: ProcessHandleGrant, record: ProcessRecord) -> Self {
1051 let definition = record.identity.definition.clone();
1052 Self::new(
1053 record.id,
1054 grant.descriptor,
1055 ProcessLifecycleStatus::from(record.status),
1056 )
1057 .with_definition(definition)
1058 }
1059}
1060
1061impl From<ProcessHandleGrantEntry> for ProcessHandleSummary {
1062 fn from((grant, record): ProcessHandleGrantEntry) -> Self {
1063 Self::from_grant_record(grant, record)
1064 }
1065}
1066
1067#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1068pub struct ProcessCancelSummary {
1069 pub process_id: ProcessId,
1070 pub status: ProcessLifecycleStatus,
1071}
1072
1073impl ProcessCancelSummary {
1074 pub fn from_record(record: ProcessRecord) -> Self {
1075 Self {
1076 process_id: record.id,
1077 status: ProcessLifecycleStatus::from(record.status),
1078 }
1079 }
1080}
1081
1082#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
1083pub enum ProcessStatusFilter {
1084 #[default]
1085 Running,
1086 Completed,
1087 Failed,
1088 Cancelled,
1089 Any,
1090}
1091
1092impl ProcessStatusFilter {
1093 pub fn decode(value: Option<&str>) -> Result<Self, String> {
1094 match value.unwrap_or("running") {
1095 "running" => Ok(Self::Running),
1096 "completed" => Ok(Self::Completed),
1097 "failed" => Ok(Self::Failed),
1098 "cancelled" => Ok(Self::Cancelled),
1099 "any" => Ok(Self::Any),
1100 other => Err(format!(
1101 "processes.list status must be `running`, `completed`, `failed`, `cancelled`, or `any`, got `{other}`"
1102 )),
1103 }
1104 }
1105
1106 pub fn list_mode(self) -> ProcessListMode {
1107 match self {
1108 Self::Running => ProcessListMode::Live,
1109 Self::Completed | Self::Failed | Self::Cancelled | Self::Any => ProcessListMode::All,
1110 }
1111 }
1112
1113 pub fn matches(self, status: ProcessLifecycleStatus) -> bool {
1114 match self {
1115 Self::Running => status == ProcessLifecycleStatus::Running,
1116 Self::Completed => status == ProcessLifecycleStatus::Completed,
1117 Self::Failed => status == ProcessLifecycleStatus::Failed,
1118 Self::Cancelled => status == ProcessLifecycleStatus::Cancelled,
1119 Self::Any => true,
1120 }
1121 }
1122}
1123
1124#[derive(Clone, Debug, Default, PartialEq)]
1125pub struct ProcessListFilter {
1126 pub definition: Option<serde_json::Value>,
1127 pub status: ProcessStatusFilter,
1128 pub waiting: Option<bool>,
1129}
1130
1131impl ProcessListFilter {
1132 pub fn decode(args: &serde_json::Value) -> Result<Self, String> {
1133 let map = args
1134 .as_object()
1135 .ok_or_else(|| "processes.list expects a record of process filters".to_string())?;
1136 for key in map.keys() {
1137 match key.as_str() {
1138 "definition" | "status" | "waiting" => {}
1139 _ => return Err(format!("processes.list unknown filter `{key}`")),
1140 }
1141 }
1142 let definition = args.get("definition").cloned();
1143 let status =
1144 ProcessStatusFilter::decode(args.get("status").and_then(serde_json::Value::as_str))?;
1145 let waiting = args
1146 .get("waiting")
1147 .map(|value| {
1148 value
1149 .as_bool()
1150 .ok_or_else(|| "processes.list `waiting` filter must be a boolean".to_string())
1151 })
1152 .transpose()?;
1153 Ok(Self {
1154 definition,
1155 status,
1156 waiting,
1157 })
1158 }
1159
1160 pub fn list_mode(&self) -> ProcessListMode {
1161 self.status.list_mode()
1162 }
1163
1164 pub fn matches_entry(&self, entry: &ProcessHandleGrantEntry) -> bool {
1165 let (_grant, record) = entry;
1166 self.matches_record(record)
1167 }
1168
1169 pub fn matches_record(&self, record: &ProcessRecord) -> bool {
1170 let status = ProcessLifecycleStatus::from(&record.status);
1171 self.status.matches(status)
1172 && self
1173 .definition
1174 .as_ref()
1175 .is_none_or(|definition| record.identity.definition.as_ref() == Some(definition))
1176 && self
1177 .waiting
1178 .is_none_or(|waiting| record.wait.is_some() == waiting)
1179 }
1180}
1181
1182#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1183#[serde(rename_all = "snake_case")]
1184pub enum ProcessListMode {
1185 #[default]
1186 Live,
1187 All,
1188}
1189
1190impl ProcessListMode {
1191 pub fn as_str(self) -> &'static str {
1192 match self {
1193 Self::Live => "live",
1194 Self::All => "all",
1195 }
1196 }
1197}
1198
1199#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1200pub struct ProcessStartGrant {
1201 pub session_scope: SessionScope,
1202 pub descriptor: ProcessHandleDescriptor,
1203}
1204
1205#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1206pub struct ProcessSessionDeleteReport {
1207 pub session_id: String,
1208 pub revoked_handle_count: usize,
1209 pub deleted_wake_count: usize,
1210 pub orphaned_process_ids: Vec<String>,
1211 pub preserved_process_ids: Vec<String>,
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216 use serde_json::json;
1217
1218 use super::*;
1219
1220 fn process_value(component: &str, pos: usize, name: &str) -> serde_json::Value {
1221 json!({
1222 "component": component,
1223 "pos": pos,
1224 "name": name,
1225 })
1226 }
1227
1228 fn engine_entry(
1229 process_id: &str,
1230 definition: serde_json::Value,
1231 process_name: &str,
1232 status: ProcessStatus,
1233 ) -> ProcessHandleGrantEntry {
1234 let mut record = ProcessRecord::from_registration(
1235 ProcessRegistration::new(
1236 process_id,
1237 ProcessInput::Engine {
1238 kind: "test-engine".to_string(),
1239 payload: json!({
1240 "definition": definition.clone(),
1241 "label": process_name,
1242 }),
1243 },
1244 ProcessProvenance::host(),
1245 )
1246 .with_identity(
1247 ProcessIdentity::new("test-engine")
1248 .with_label(Some(process_name))
1249 .with_definition(Some(definition)),
1250 )
1251 .with_execution_env_ref(Some(ProcessExecutionEnvRef::new(format!(
1252 "process-env:test:{process_id}"
1253 )))),
1254 );
1255 record.status = status;
1256 (
1257 ProcessHandleGrant {
1258 session_id: "session".to_string(),
1259 process_id: process_id.to_string(),
1260 descriptor: ProcessHandleDescriptor::new(Some("test-engine"), Some(process_name)),
1261 },
1262 record,
1263 )
1264 }
1265
1266 #[test]
1267 fn process_list_filter_matches_definition_and_status() {
1268 let target_ref = process_value("target", 0, "target");
1269 let other_ref = process_value("other", 1, "other");
1270 let filter = ProcessListFilter::decode(&json!({
1271 "definition": target_ref,
1272 "status": "completed"
1273 }))
1274 .expect("decode filter");
1275
1276 let matching = engine_entry(
1277 "matching",
1278 target_ref,
1279 "target",
1280 ProcessStatus::Completed {
1281 await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1282 json!(true),
1283 )),
1284 },
1285 );
1286 let wrong_definition = engine_entry(
1287 "wrong-definition",
1288 other_ref,
1289 "other",
1290 ProcessStatus::Completed {
1291 await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1292 json!(true),
1293 )),
1294 },
1295 );
1296
1297 assert_eq!(filter.list_mode(), ProcessListMode::All);
1298 assert!(filter.matches_entry(&matching));
1299 assert!(!filter.matches_entry(&wrong_definition));
1300 }
1301
1302 #[test]
1303 fn process_list_filter_matches_waiting_facet() {
1304 let process_ref = process_value("target", 0, "target");
1305 let mut waiting_entry = engine_entry(
1306 "waiting",
1307 process_ref.clone(),
1308 "target",
1309 ProcessStatus::Running,
1310 );
1311 waiting_entry.1.wait = Some(WaitState {
1312 since_ms: 42,
1313 kind: WaitKind::Signal {
1314 name: "ready".to_string(),
1315 event_type: "signal.ready".to_string(),
1316 key: "process:waiting:signal.ready:1".to_string(),
1317 ordinal: 1,
1318 },
1319 });
1320 let idle_entry = engine_entry("idle", process_ref, "target", ProcessStatus::Running);
1321 let waiting_filter =
1322 ProcessListFilter::decode(&json!({ "waiting": true })).expect("decode waiting filter");
1323 let idle_filter =
1324 ProcessListFilter::decode(&json!({ "waiting": false })).expect("decode idle filter");
1325
1326 assert_eq!(waiting_filter.list_mode(), ProcessListMode::Live);
1327 assert!(waiting_filter.matches_entry(&waiting_entry));
1328 assert!(!waiting_filter.matches_entry(&idle_entry));
1329 assert!(!idle_filter.matches_entry(&waiting_entry));
1330 assert!(idle_filter.matches_entry(&idle_entry));
1331 assert!(
1332 ProcessListFilter::decode(&json!({ "waiting": "yes" }))
1333 .expect_err("invalid waiting filter")
1334 .contains("must be a boolean")
1335 );
1336 }
1337}