1use std::collections::BTreeMap;
2use std::fmt;
3use std::sync::Arc;
4use std::sync::Mutex;
5
6use serde::{Deserialize, Serialize};
7
8use super::events::{
9 ProcessAwaitOutput, ProcessEventType, ProcessTerminalSemantics, ProcessTerminalState,
10 default_process_event_types,
11};
12use super::validation::{
13 ensure_core_event_types, process_registration_hash, validate_process_registration,
14};
15
16pub type ProcessId = String;
17
18#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
19#[serde(transparent)]
20pub struct SessionScopeId(String);
21
22impl SessionScopeId {
23 pub fn new(value: impl Into<String>) -> Self {
24 Self(value.into())
25 }
26
27 pub fn as_str(&self) -> &str {
28 &self.0
29 }
30}
31
32impl fmt::Display for SessionScopeId {
33 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
34 formatter.write_str(&self.0)
35 }
36}
37
38impl From<String> for SessionScopeId {
39 fn from(value: String) -> Self {
40 Self::new(value)
41 }
42}
43
44impl From<&str> for SessionScopeId {
45 fn from(value: &str) -> Self {
46 Self::new(value)
47 }
48}
49
50#[derive(Debug, Serialize, Deserialize)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum ProcessInput {
61 ToolCall {
62 call: crate::PreparedToolCall,
63 },
64 Engine {
65 kind: String,
66 #[serde(default)]
67 payload: serde_json::Value,
68 },
69 SessionTurn {
70 create_request: Box<crate::SessionCreateRequest>,
71 turn_input: Box<crate::TurnInput>,
72 output_contract: crate::ToolOutputContract,
73 },
74 External {
75 #[serde(default)]
76 metadata: serde_json::Value,
77 },
78}
79
80impl Clone for ProcessInput {
81 fn clone(&self) -> Self {
82 match self {
83 Self::ToolCall { call } => Self::ToolCall { call: call.clone() },
84 Self::Engine { kind, payload } => Self::Engine {
85 kind: kind.clone(),
86 payload: payload.clone(),
87 },
88 Self::SessionTurn {
89 create_request,
90 turn_input,
91 output_contract,
92 } => Self::SessionTurn {
93 create_request: create_request.clone(),
94 turn_input: turn_input.clone(),
95 output_contract: output_contract.clone(),
96 },
97 Self::External { metadata } => Self::External {
98 metadata: metadata.clone(),
99 },
100 }
101 }
102}
103
104impl PartialEq for ProcessInput {
105 fn eq(&self, other: &Self) -> bool {
106 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
107 }
108}
109
110impl ProcessInput {
111 pub fn engine_kind(&self) -> &'static str {
112 match self {
113 Self::ToolCall { .. } => "tool",
114 Self::Engine { .. } => "engine",
115 Self::SessionTurn { .. } => "session_turn",
116 Self::External { .. } => "external",
117 }
118 }
119
120 pub fn engine_specific_kind(&self) -> Option<&str> {
121 match self {
122 Self::Engine { kind, .. } => Some(kind.as_str()),
123 _ => None,
124 }
125 }
126}
127
128#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
129#[serde(transparent)]
130pub struct ProcessExecutionEnvRef(String);
131
132impl ProcessExecutionEnvRef {
133 pub fn new(value: impl Into<String>) -> Self {
134 Self(value.into())
135 }
136
137 pub fn as_str(&self) -> &str {
138 &self.0
139 }
140}
141
142impl fmt::Display for ProcessExecutionEnvRef {
143 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
144 formatter.write_str(&self.0)
145 }
146}
147
148#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
149#[serde(deny_unknown_fields)]
150pub struct ProcessExecutionEnvSpec {
151 #[serde(default)]
152 pub plugin_options: crate::PluginOptions,
153 #[serde(default)]
154 pub policy: crate::SessionPolicy,
155}
156
157impl ProcessExecutionEnvSpec {
158 pub fn new(plugin_options: crate::PluginOptions, policy: crate::SessionPolicy) -> Self {
159 Self {
160 plugin_options,
161 policy,
162 }
163 }
164
165 pub fn stable_ref(&self) -> Result<ProcessExecutionEnvRef, serde_json::Error> {
166 crate::stable_hash::stable_json_sha256_hex(self)
167 .map(|hash| ProcessExecutionEnvRef::new(format!("process-env:sha256:{hash}")))
168 }
169
170 pub fn to_store_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
171 serde_json::to_vec(self)
172 }
173
174 pub fn from_store_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
175 serde_json::from_slice(bytes)
176 }
177}
178
179#[async_trait::async_trait]
180pub trait ProcessExecutionEnvStore: Send + Sync {
181 fn durability_tier(&self) -> crate::DurabilityTier {
182 crate::DurabilityTier::Inline
183 }
184
185 async fn put_process_execution_env(
186 &self,
187 env_ref: &ProcessExecutionEnvRef,
188 bytes: &[u8],
189 ) -> Result<(), crate::PluginError>;
190
191 async fn get_process_execution_env(
192 &self,
193 env_ref: &ProcessExecutionEnvRef,
194 ) -> Result<Option<Vec<u8>>, crate::PluginError>;
195}
196
197#[derive(Default)]
198pub struct InMemoryProcessExecutionEnvStore {
199 envs: Mutex<BTreeMap<String, Vec<u8>>>,
200}
201
202impl InMemoryProcessExecutionEnvStore {
203 pub fn new() -> Self {
204 Self::default()
205 }
206}
207
208#[async_trait::async_trait]
209impl ProcessExecutionEnvStore for InMemoryProcessExecutionEnvStore {
210 async fn put_process_execution_env(
211 &self,
212 env_ref: &ProcessExecutionEnvRef,
213 bytes: &[u8],
214 ) -> Result<(), crate::PluginError> {
215 self.envs
216 .lock()
217 .map_err(|_| {
218 crate::PluginError::Session("process execution env store lock poisoned".to_string())
219 })?
220 .insert(env_ref.as_str().to_string(), bytes.to_vec());
221 Ok(())
222 }
223
224 async fn get_process_execution_env(
225 &self,
226 env_ref: &ProcessExecutionEnvRef,
227 ) -> Result<Option<Vec<u8>>, crate::PluginError> {
228 Ok(self
229 .envs
230 .lock()
231 .map_err(|_| {
232 crate::PluginError::Session("process execution env store lock poisoned".to_string())
233 })?
234 .get(env_ref.as_str())
235 .cloned())
236 }
237}
238
239pub async fn persist_process_execution_env(
240 env_store: &dyn ProcessExecutionEnvStore,
241 spec: &ProcessExecutionEnvSpec,
242) -> Result<ProcessExecutionEnvRef, crate::PluginError> {
243 let env_ref = spec.stable_ref().map_err(|err| {
244 crate::PluginError::Session(format!("failed to hash process execution env: {err}"))
245 })?;
246 let bytes = spec.to_store_bytes().map_err(|err| {
247 crate::PluginError::Session(format!("failed to encode process execution env: {err}"))
248 })?;
249 env_store
250 .put_process_execution_env(&env_ref, &bytes)
251 .await?;
252 Ok(env_ref)
253}
254
255pub async fn load_process_execution_env(
256 env_store: &dyn ProcessExecutionEnvStore,
257 env_ref: &ProcessExecutionEnvRef,
258) -> Result<ProcessExecutionEnvSpec, crate::PluginError> {
259 let bytes = env_store
260 .get_process_execution_env(env_ref)
261 .await?
262 .ok_or_else(|| {
263 crate::PluginError::Session(format!("missing process execution env `{env_ref}`"))
264 })?;
265 ProcessExecutionEnvSpec::from_store_bytes(&bytes).map_err(|err| {
266 crate::PluginError::Session(format!(
267 "failed to decode process execution env `{env_ref}`: {err}"
268 ))
269 })
270}
271
272#[derive(Clone, Debug, Default, Serialize, Deserialize)]
275pub struct ProcessExecutionContext {
276 #[serde(default, skip_serializing_if = "Option::is_none")]
277 pub causal_invocation: Option<crate::RuntimeInvocation>,
278}
279
280impl ProcessExecutionContext {
281 pub fn with_causal_invocation(mut self, invocation: Option<crate::RuntimeInvocation>) -> Self {
282 self.causal_invocation = invocation;
283 self
284 }
285
286 pub fn is_empty(&self) -> bool {
287 self.causal_invocation.is_none()
288 }
289}
290
291#[derive(Clone)]
292pub struct ProcessOpScope<'scope> {
293 pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
294 pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'scope>,
295 pub(crate) agent_frame_id: Option<crate::AgentFrameId>,
296 pub(crate) target_agent_frame_id: Option<crate::AgentFrameId>,
297}
298
299impl<'scope> ProcessOpScope<'scope> {
300 pub fn new(scoped_effect_controller: crate::ScopedEffectController<'scope>) -> Self {
301 Self {
302 parent_invocation: None,
303 effect_controller: crate::runtime::RuntimeEffectControllerHandle::borrowed(
304 scoped_effect_controller,
305 ),
306 agent_frame_id: None,
307 target_agent_frame_id: None,
308 }
309 }
310
311 pub fn with_parent_invocation(
312 mut self,
313 parent_invocation: Option<crate::RuntimeInvocation>,
314 ) -> Self {
315 self.parent_invocation = parent_invocation;
316 self
317 }
318
319 pub fn with_agent_frame_id(mut self, agent_frame_id: Option<crate::AgentFrameId>) -> Self {
320 self.agent_frame_id = agent_frame_id;
321 self
322 }
323
324 pub fn with_target_agent_frame_id(
325 mut self,
326 agent_frame_id: Option<crate::AgentFrameId>,
327 ) -> Self {
328 self.target_agent_frame_id = agent_frame_id;
329 self
330 }
331
332 pub fn agent_frame_id(&self) -> Option<&str> {
333 self.agent_frame_id.as_deref()
334 }
335
336 pub fn target_agent_frame_id(&self) -> Option<&str> {
337 self.target_agent_frame_id.as_deref()
338 }
339
340 pub(crate) fn controller(&self) -> &dyn crate::RuntimeEffectController {
341 self.effect_controller.controller()
342 }
343}
344
345#[derive(Clone, Debug, Default)]
346pub struct ProcessStartOptions {
347 pub descriptor: Option<ProcessHandleDescriptor>,
348 pub spawn_provenance: Option<ProcessSpawnProvenance>,
356}
357
358#[derive(Clone, Debug, PartialEq, Eq)]
363pub struct ProcessSpawnProvenance {
364 pub originator: ProcessOriginator,
365 pub wake_target: Option<SessionScope>,
366}
367
368impl ProcessStartOptions {
369 pub fn new() -> Self {
370 Self::default()
371 }
372
373 pub fn with_descriptor(mut self, descriptor: ProcessHandleDescriptor) -> Self {
374 self.descriptor = Some(descriptor);
375 self
376 }
377
378 pub fn with_optional_descriptor(mut self, descriptor: Option<ProcessHandleDescriptor>) -> Self {
379 self.descriptor = descriptor;
380 self
381 }
382
383 pub fn with_spawn_provenance(mut self, spawn_provenance: ProcessSpawnProvenance) -> Self {
384 self.spawn_provenance = Some(spawn_provenance);
385 self
386 }
387
388 pub fn execution_context(&self, scope: &ProcessOpScope<'_>) -> ProcessExecutionContext {
389 ProcessExecutionContext {
390 causal_invocation: scope.parent_invocation.clone(),
391 }
392 }
393}
394
395#[derive(Clone, Debug, Serialize, Deserialize)]
397pub struct ProcessStartRequest {
398 pub id: ProcessId,
399 pub input: ProcessInput,
400 #[serde(default, skip_serializing_if = "Option::is_none")]
401 pub env_spec: Option<ProcessExecutionEnvSpec>,
402 pub originator: ProcessOriginator,
403 #[serde(default, skip_serializing_if = "Option::is_none")]
404 pub wake_target: Option<SessionScope>,
405 #[serde(default, skip_serializing_if = "Option::is_none")]
406 pub grant: Option<ProcessStartGrant>,
407 #[serde(default)]
408 pub event_types: Vec<ProcessEventType>,
409}
410
411impl ProcessStartRequest {
412 pub fn new(
413 id: impl Into<ProcessId>,
414 input: ProcessInput,
415 originator: ProcessOriginator,
416 ) -> Self {
417 Self {
418 id: id.into(),
419 input,
420 env_spec: None,
421 originator,
422 wake_target: None,
423 grant: None,
424 event_types: default_process_event_types(),
425 }
426 }
427
428 pub fn external(
429 id: impl Into<ProcessId>,
430 originator: ProcessOriginator,
431 metadata: serde_json::Value,
432 ) -> Self {
433 Self::new(id, ProcessInput::External { metadata }, originator)
434 }
435
436 pub fn with_env_spec(mut self, env_spec: ProcessExecutionEnvSpec) -> Self {
437 self.env_spec = Some(env_spec);
438 self
439 }
440
441 pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
442 self.wake_target = wake_target;
443 self
444 }
445
446 pub fn with_grant(mut self, grant: Option<ProcessStartGrant>) -> Self {
447 self.grant = grant;
448 self
449 }
450
451 pub fn with_event_types(
452 mut self,
453 event_types: impl IntoIterator<Item = ProcessEventType>,
454 ) -> Self {
455 self.event_types = event_types.into_iter().collect();
456 self
457 }
458
459 pub fn with_extra_event_types(
460 mut self,
461 event_types: impl IntoIterator<Item = ProcessEventType>,
462 ) -> Self {
463 self.event_types.extend(event_types);
464 self
465 }
466
467 pub fn into_registration(self, env_ref: Option<ProcessExecutionEnvRef>) -> ProcessRegistration {
468 ProcessRegistration::new(self.id, self.input, ProcessProvenance::new(self.originator))
469 .with_event_types(self.event_types)
470 .with_execution_env_ref(env_ref)
471 .with_wake_target(self.wake_target)
472 }
473}
474
475#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
476pub struct SessionScope {
477 pub session_id: String,
478 #[serde(default, skip_serializing_if = "Option::is_none")]
479 pub agent_frame_id: Option<crate::AgentFrameId>,
480}
481
482#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
483pub struct ProcessProvenance {
484 pub originator: ProcessOriginator,
485 #[serde(default, skip_serializing_if = "Option::is_none")]
486 pub caused_by: Option<crate::CausalRef>,
487}
488
489impl ProcessProvenance {
490 pub fn new(originator: ProcessOriginator) -> Self {
491 Self {
492 originator,
493 caused_by: None,
494 }
495 }
496
497 pub fn host() -> Self {
498 Self::new(ProcessOriginator::host())
499 }
500
501 pub fn session(scope: SessionScope) -> Self {
502 Self::new(ProcessOriginator::session(scope))
503 }
504
505 pub fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
506 self.caused_by = caused_by;
507 self
508 }
509}
510
511#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
512#[serde(tag = "type", rename_all = "snake_case")]
513pub enum ProcessOriginator {
514 Host,
515 Session { scope: SessionScope },
516}
517
518impl ProcessOriginator {
519 pub fn host() -> Self {
520 Self::Host
521 }
522
523 pub fn session(scope: SessionScope) -> Self {
524 Self::Session { scope }
525 }
526
527 pub fn scope_id(&self) -> String {
528 match self {
529 Self::Host => "host".to_string(),
530 Self::Session { scope } => scope.id().to_string(),
531 }
532 }
533}
534
535impl SessionScope {
536 pub fn new(session_id: impl Into<String>) -> Self {
537 Self {
538 session_id: session_id.into(),
539 agent_frame_id: None,
540 }
541 }
542
543 pub fn for_agent_frame(
544 session_id: impl Into<String>,
545 agent_frame_id: impl Into<crate::AgentFrameId>,
546 ) -> Self {
547 Self {
548 session_id: session_id.into(),
549 agent_frame_id: Some(agent_frame_id.into()),
550 }
551 }
552
553 pub fn id(&self) -> SessionScopeId {
554 match self.agent_frame_id.as_deref() {
555 Some(frame_id) if !frame_id.is_empty() => {
556 SessionScopeId::new(format!("session:{}/frame:{frame_id}", self.session_id))
557 }
558 _ => SessionScopeId::new(format!("session:{}", self.session_id)),
559 }
560 }
561
562 pub fn is_empty(&self) -> bool {
563 self.session_id.is_empty()
564 }
565}
566
567#[derive(Debug, Serialize, Deserialize)]
569pub struct ProcessRegistration {
570 pub id: ProcessId,
571 pub input: Arc<ProcessInput>,
572 pub identity: ProcessIdentity,
573 #[serde(default)]
574 pub event_types: Vec<ProcessEventType>,
575 pub provenance: ProcessProvenance,
576 #[serde(default, skip_serializing_if = "Option::is_none")]
577 pub env_ref: Option<ProcessExecutionEnvRef>,
578 #[serde(default, skip_serializing_if = "Option::is_none")]
579 pub wake_target: Option<SessionScope>,
580}
581
582impl Clone for ProcessRegistration {
583 fn clone(&self) -> Self {
584 Self {
585 id: self.id.clone(),
586 input: Arc::clone(&self.input),
587 identity: self.identity.clone(),
588 event_types: self.event_types.clone(),
589 provenance: self.provenance.clone(),
590 env_ref: self.env_ref.clone(),
591 wake_target: self.wake_target.clone(),
592 }
593 }
594}
595
596impl ProcessRegistration {
597 pub fn new(
598 id: impl Into<ProcessId>,
599 input: ProcessInput,
600 provenance: ProcessProvenance,
601 ) -> Self {
602 let identity = ProcessIdentity::from_process_input(&input);
603 Self {
604 id: id.into(),
605 input: Arc::new(input),
606 identity,
607 event_types: default_process_event_types(),
608 provenance,
609 env_ref: None,
610 wake_target: None,
611 }
612 }
613
614 pub(crate) fn session_start_draft(id: impl Into<ProcessId>, input: ProcessInput) -> Self {
615 Self::new(id, input, ProcessProvenance::host())
616 }
617
618 pub fn with_process_provenance(mut self, provenance: ProcessProvenance) -> Self {
619 self.provenance = provenance;
620 self
621 }
622
623 pub fn with_execution_env_ref(mut self, env_ref: Option<ProcessExecutionEnvRef>) -> Self {
624 self.env_ref = env_ref;
625 self
626 }
627
628 pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
629 self.wake_target = wake_target;
630 self
631 }
632
633 pub fn with_identity(mut self, identity: ProcessIdentity) -> Self {
634 self.identity = identity;
635 self
636 }
637
638 pub fn with_event_types(
639 mut self,
640 event_types: impl IntoIterator<Item = ProcessEventType>,
641 ) -> Self {
642 self.event_types = event_types.into_iter().collect();
643 self
644 }
645
646 pub fn with_extra_event_types(
647 mut self,
648 event_types: impl IntoIterator<Item = ProcessEventType>,
649 ) -> Self {
650 self.event_types.extend(event_types);
651 self
652 }
653}
654
655#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
656#[serde(tag = "state", rename_all = "snake_case")]
657pub enum ProcessStatus {
658 #[default]
659 Running,
660 Completed {
661 await_output: ProcessAwaitOutput,
662 },
663 Failed {
664 await_output: ProcessAwaitOutput,
665 },
666 Cancelled {
667 await_output: ProcessAwaitOutput,
668 },
669}
670
671impl ProcessStatus {
672 pub fn from_terminal(terminal: ProcessTerminalSemantics) -> Self {
673 match terminal.state {
674 ProcessTerminalState::Completed => Self::Completed {
675 await_output: terminal.await_output,
676 },
677 ProcessTerminalState::Failed => Self::Failed {
678 await_output: terminal.await_output,
679 },
680 ProcessTerminalState::Cancelled => Self::Cancelled {
681 await_output: terminal.await_output,
682 },
683 }
684 }
685
686 pub fn is_terminal(&self) -> bool {
687 !matches!(self, Self::Running)
688 }
689
690 pub fn label(&self) -> &'static str {
691 match self {
692 Self::Running => "running",
693 Self::Completed { .. } => "completed",
694 Self::Failed { .. } => "failed",
695 Self::Cancelled { .. } => "cancelled",
696 }
697 }
698
699 pub fn terminal_state(&self) -> Option<ProcessTerminalState> {
700 match self {
701 Self::Running => None,
702 Self::Completed { .. } => Some(ProcessTerminalState::Completed),
703 Self::Failed { .. } => Some(ProcessTerminalState::Failed),
704 Self::Cancelled { .. } => Some(ProcessTerminalState::Cancelled),
705 }
706 }
707
708 pub fn await_output(&self) -> Option<&ProcessAwaitOutput> {
709 match self {
710 Self::Running => None,
711 Self::Completed { await_output }
712 | Self::Failed { await_output }
713 | Self::Cancelled { await_output } => Some(await_output),
714 }
715 }
716
717 pub fn terminal_semantics(&self) -> Option<ProcessTerminalSemantics> {
718 Some(ProcessTerminalSemantics {
719 state: self.terminal_state()?,
720 await_output: self.await_output()?.clone(),
721 })
722 }
723}
724
725#[derive(Clone, Debug, Serialize, Deserialize)]
728pub struct ProcessRecord {
729 pub id: ProcessId,
730 pub registration_hash: String,
731 pub input: Arc<ProcessInput>,
732 pub identity: ProcessIdentity,
733 #[serde(default)]
734 pub event_types: Vec<ProcessEventType>,
735 pub provenance: ProcessProvenance,
736 #[serde(default, skip_serializing_if = "Option::is_none")]
737 pub env_ref: Option<ProcessExecutionEnvRef>,
738 #[serde(default, skip_serializing_if = "Option::is_none")]
739 pub wake_target: Option<SessionScope>,
740 #[serde(default)]
741 pub created_at_ms: u64,
742 #[serde(default)]
743 pub updated_at_ms: u64,
744 #[serde(default, skip_serializing_if = "Option::is_none")]
745 pub external_ref: Option<ProcessExternalRef>,
746 #[serde(default, skip_serializing_if = "Option::is_none")]
747 pub wait: Option<WaitState>,
748 #[serde(default)]
749 pub status: ProcessStatus,
750}
751
752#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
753pub struct WaitState {
754 pub kind: WaitKind,
755 pub since_ms: u64,
756}
757
758#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
759#[serde(tag = "kind", rename_all = "snake_case")]
760pub enum WaitKind {
761 Signal {
762 name: String,
763 event_type: String,
764 key: String,
765 ordinal: u64,
766 },
767}
768
769impl ProcessRecord {
770 pub fn from_registration(registration: ProcessRegistration) -> Self {
771 Self::from_registration_with_clock(registration, &crate::SystemClock)
772 }
773
774 pub fn from_registration_with_clock(
775 mut registration: ProcessRegistration,
776 clock: &dyn crate::Clock,
777 ) -> Self {
778 ensure_core_event_types(&mut registration);
779 validate_process_registration(®istration)
780 .expect("process registration should be valid before record construction");
781 let registration_hash = process_registration_hash(®istration)
782 .expect("process registration should hash before record construction");
783 Self::from_prepared_registration(registration, registration_hash, clock.timestamp_ms())
784 }
785
786 pub fn from_prepared_registration(
787 registration: ProcessRegistration,
788 registration_hash: String,
789 now_ms: u64,
790 ) -> Self {
791 Self {
792 id: registration.id,
793 registration_hash,
794 input: registration.input,
795 identity: registration.identity,
796 event_types: registration.event_types,
797 provenance: registration.provenance,
798 env_ref: registration.env_ref,
799 wake_target: registration.wake_target,
800 created_at_ms: now_ms,
801 updated_at_ms: now_ms,
802 external_ref: None,
803 wait: None,
804 status: ProcessStatus::Running,
805 }
806 }
807
808 pub fn is_terminal(&self) -> bool {
809 self.status.is_terminal()
810 }
811
812 pub fn clear_wake_target_for_session(&mut self, session_id: &str) -> bool {
813 self.clear_wake_target_for_session_with_clock(session_id, &crate::SystemClock)
814 }
815
816 pub fn clear_wake_target_for_session_with_clock(
817 &mut self,
818 session_id: &str,
819 clock: &dyn crate::Clock,
820 ) -> bool {
821 let should_clear = self
822 .wake_target
823 .as_ref()
824 .is_some_and(|scope| scope.session_id == session_id);
825 if should_clear {
826 self.wake_target = None;
827 self.updated_at_ms = clock.timestamp_ms();
828 }
829 should_clear
830 }
831
832 pub fn originator_scope_id(&self) -> String {
833 self.provenance.originator.scope_id()
834 }
835}
836
837#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
844pub struct ProcessIdentity {
845 pub kind: String,
846 #[serde(default, skip_serializing_if = "Option::is_none")]
847 pub label: Option<String>,
848 #[serde(default, skip_serializing_if = "Option::is_none")]
849 pub definition: Option<serde_json::Value>,
850}
851
852impl ProcessIdentity {
853 pub fn new(kind: impl Into<String>) -> Self {
854 Self {
855 kind: kind.into(),
856 label: None,
857 definition: None,
858 }
859 }
860
861 pub fn with_label(mut self, label: Option<impl Into<String>>) -> Self {
862 self.label = label.map(Into::into);
863 self
864 }
865
866 pub fn with_definition(mut self, definition: Option<serde_json::Value>) -> Self {
867 self.definition = definition;
868 self
869 }
870
871 pub fn from_process_input(input: &ProcessInput) -> Self {
872 match input {
873 ProcessInput::ToolCall { call } => {
874 Self::new("tool").with_label(Some(call.tool_name.clone()))
875 }
876 ProcessInput::Engine { kind, .. } => Self::new(kind.clone()),
877 ProcessInput::SessionTurn { create_request, .. } => {
878 let label = create_request
879 .subagent
880 .as_ref()
881 .map(|subagent| subagent.capability.clone())
882 .or_else(|| create_request.usage_source.clone())
883 .or_else(|| create_request.session_id.clone());
884 Self::new("session_turn").with_label(label)
885 }
886 ProcessInput::External { metadata } => {
887 let label = metadata
888 .get("label")
889 .or_else(|| metadata.get("name"))
890 .or_else(|| metadata.get("title"))
891 .and_then(serde_json::Value::as_str)
892 .map(str::to_string);
893 Self::new("external").with_label(label)
894 }
895 }
896 }
897}
898
899pub const PROCESS_LEASE_SCHEMA_VERSION: u32 = 2;
906
907#[derive(Clone, Debug, Serialize, Deserialize)]
927pub struct ProcessLease {
928 pub schema_version: u32,
929 pub process_id: ProcessId,
930 pub owner: crate::LeaseOwnerIdentity,
931 pub lease_token: String,
932 pub fencing_token: u64,
933 pub claimed_at_epoch_ms: u64,
934 pub expires_at_epoch_ms: u64,
935}
936
937#[derive(Clone, Debug, Serialize, Deserialize)]
943pub enum ProcessLeaseClaimOutcome {
944 Acquired(ProcessLease),
945 Busy { holder: ProcessLease },
946}
947
948impl ProcessLeaseClaimOutcome {
949 pub fn acquired(self) -> Option<ProcessLease> {
950 match self {
951 Self::Acquired(lease) => Some(lease),
952 Self::Busy { .. } => None,
953 }
954 }
955}
956
957#[derive(Clone, Debug, Serialize, Deserialize)]
958pub struct ProcessLeaseCompletion {
959 pub process_id: ProcessId,
960 pub lease_token: String,
961}
962
963impl ProcessLeaseCompletion {
964 pub fn from_lease(lease: &ProcessLease) -> Self {
965 Self {
966 process_id: lease.process_id.clone(),
967 lease_token: lease.lease_token.clone(),
968 }
969 }
970}
971
972#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
974pub struct ProcessExternalRef {
975 pub backend: String,
976 pub id: String,
977 #[serde(default, skip_serializing_if = "Option::is_none")]
978 pub metadata: Option<serde_json::Value>,
979}
980
981#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
982pub struct ProcessHandleDescriptor {
983 #[serde(default, skip_serializing_if = "Option::is_none")]
984 pub kind: Option<String>,
985 #[serde(default, skip_serializing_if = "Option::is_none")]
986 pub label: Option<String>,
987}
988
989impl ProcessHandleDescriptor {
990 pub fn new(kind: Option<impl Into<String>>, label: Option<impl Into<String>>) -> Self {
991 Self {
992 kind: kind.map(Into::into),
993 label: label.map(Into::into),
994 }
995 }
996}
997
998#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
999pub struct ProcessHandleGrant {
1000 pub session_id: String,
1001 pub process_id: ProcessId,
1002 pub descriptor: ProcessHandleDescriptor,
1003}
1004
1005pub type ProcessHandleGrantEntry = (ProcessHandleGrant, ProcessRecord);
1006
1007#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1008#[serde(rename_all = "snake_case")]
1009pub enum ProcessLifecycleStatus {
1010 #[default]
1011 Running,
1012 Completed,
1013 Failed,
1014 Cancelled,
1015}
1016
1017impl ProcessLifecycleStatus {
1018 pub fn label(self) -> &'static str {
1019 match self {
1020 Self::Running => "running",
1021 Self::Completed => "completed",
1022 Self::Failed => "failed",
1023 Self::Cancelled => "cancelled",
1024 }
1025 }
1026
1027 pub fn is_terminal(self) -> bool {
1028 !matches!(self, Self::Running)
1029 }
1030
1031 pub fn terminal_state(self) -> Option<ProcessTerminalState> {
1032 match self {
1033 Self::Running => None,
1034 Self::Completed => Some(ProcessTerminalState::Completed),
1035 Self::Failed => Some(ProcessTerminalState::Failed),
1036 Self::Cancelled => Some(ProcessTerminalState::Cancelled),
1037 }
1038 }
1039}
1040
1041impl From<&ProcessStatus> for ProcessLifecycleStatus {
1042 fn from(status: &ProcessStatus) -> Self {
1043 match status {
1044 ProcessStatus::Running => Self::Running,
1045 ProcessStatus::Completed { .. } => Self::Completed,
1046 ProcessStatus::Failed { .. } => Self::Failed,
1047 ProcessStatus::Cancelled { .. } => Self::Cancelled,
1048 }
1049 }
1050}
1051
1052impl From<ProcessStatus> for ProcessLifecycleStatus {
1053 fn from(status: ProcessStatus) -> Self {
1054 Self::from(&status)
1055 }
1056}
1057
1058#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1059pub struct ProcessHandleSummary {
1060 #[serde(rename = "__handle__")]
1061 pub handle_type: String,
1062 pub id: ProcessId,
1063 pub process_id: ProcessId,
1064 pub descriptor: ProcessHandleDescriptor,
1065 #[serde(default, skip_serializing_if = "Option::is_none")]
1066 pub definition: Option<serde_json::Value>,
1067 pub status: ProcessLifecycleStatus,
1068}
1069
1070impl ProcessHandleSummary {
1071 pub fn new(
1072 process_id: impl Into<ProcessId>,
1073 descriptor: ProcessHandleDescriptor,
1074 status: ProcessLifecycleStatus,
1075 ) -> Self {
1076 let process_id = process_id.into();
1077 Self {
1078 handle_type: "process".to_string(),
1079 id: process_id.clone(),
1080 process_id,
1081 descriptor,
1082 definition: None,
1083 status,
1084 }
1085 }
1086
1087 pub fn with_definition(mut self, definition: Option<serde_json::Value>) -> Self {
1088 self.definition = definition;
1089 self
1090 }
1091
1092 pub fn from_grant_record(grant: ProcessHandleGrant, record: ProcessRecord) -> Self {
1093 let definition = record.identity.definition.clone();
1094 Self::new(
1095 record.id,
1096 grant.descriptor,
1097 ProcessLifecycleStatus::from(record.status),
1098 )
1099 .with_definition(definition)
1100 }
1101}
1102
1103impl From<ProcessHandleGrantEntry> for ProcessHandleSummary {
1104 fn from((grant, record): ProcessHandleGrantEntry) -> Self {
1105 Self::from_grant_record(grant, record)
1106 }
1107}
1108
1109#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1110pub struct ProcessCancelSummary {
1111 pub process_id: ProcessId,
1112 pub status: ProcessLifecycleStatus,
1113}
1114
1115impl ProcessCancelSummary {
1116 pub fn from_record(record: ProcessRecord) -> Self {
1117 Self {
1118 process_id: record.id,
1119 status: ProcessLifecycleStatus::from(record.status),
1120 }
1121 }
1122}
1123
1124#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
1125pub enum ProcessStatusFilter {
1126 #[default]
1127 Running,
1128 Completed,
1129 Failed,
1130 Cancelled,
1131 Any,
1132}
1133
1134impl ProcessStatusFilter {
1135 pub fn decode(value: Option<&str>) -> Result<Self, String> {
1136 match value.unwrap_or("running") {
1137 "running" => Ok(Self::Running),
1138 "completed" => Ok(Self::Completed),
1139 "failed" => Ok(Self::Failed),
1140 "cancelled" => Ok(Self::Cancelled),
1141 "any" => Ok(Self::Any),
1142 other => Err(format!(
1143 "processes.list status must be `running`, `completed`, `failed`, `cancelled`, or `any`, got `{other}`"
1144 )),
1145 }
1146 }
1147
1148 pub fn list_mode(self) -> ProcessListMode {
1149 match self {
1150 Self::Running => ProcessListMode::Live,
1151 Self::Completed | Self::Failed | Self::Cancelled | Self::Any => ProcessListMode::All,
1152 }
1153 }
1154
1155 pub fn matches(self, status: ProcessLifecycleStatus) -> bool {
1156 match self {
1157 Self::Running => status == ProcessLifecycleStatus::Running,
1158 Self::Completed => status == ProcessLifecycleStatus::Completed,
1159 Self::Failed => status == ProcessLifecycleStatus::Failed,
1160 Self::Cancelled => status == ProcessLifecycleStatus::Cancelled,
1161 Self::Any => true,
1162 }
1163 }
1164}
1165
1166#[derive(Clone, Debug, Default, PartialEq)]
1167pub struct ProcessListFilter {
1168 pub definition: Option<serde_json::Value>,
1169 pub status: ProcessStatusFilter,
1170 pub waiting: Option<bool>,
1171}
1172
1173impl ProcessListFilter {
1174 pub fn decode(args: &serde_json::Value) -> Result<Self, String> {
1175 let map = args
1176 .as_object()
1177 .ok_or_else(|| "processes.list expects a record of process filters".to_string())?;
1178 for key in map.keys() {
1179 match key.as_str() {
1180 "definition" | "status" | "waiting" => {}
1181 _ => return Err(format!("processes.list unknown filter `{key}`")),
1182 }
1183 }
1184 let definition = args.get("definition").cloned();
1185 let status =
1186 ProcessStatusFilter::decode(args.get("status").and_then(serde_json::Value::as_str))?;
1187 let waiting = args
1188 .get("waiting")
1189 .map(|value| {
1190 value
1191 .as_bool()
1192 .ok_or_else(|| "processes.list `waiting` filter must be a boolean".to_string())
1193 })
1194 .transpose()?;
1195 Ok(Self {
1196 definition,
1197 status,
1198 waiting,
1199 })
1200 }
1201
1202 pub fn list_mode(&self) -> ProcessListMode {
1203 self.status.list_mode()
1204 }
1205
1206 pub fn matches_entry(&self, entry: &ProcessHandleGrantEntry) -> bool {
1207 let (_grant, record) = entry;
1208 self.matches_record(record)
1209 }
1210
1211 pub fn matches_record(&self, record: &ProcessRecord) -> bool {
1212 let status = ProcessLifecycleStatus::from(&record.status);
1213 self.status.matches(status)
1214 && self
1215 .definition
1216 .as_ref()
1217 .is_none_or(|definition| record.identity.definition.as_ref() == Some(definition))
1218 && self
1219 .waiting
1220 .is_none_or(|waiting| record.wait.is_some() == waiting)
1221 }
1222}
1223
1224#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1225#[serde(rename_all = "snake_case")]
1226pub enum ProcessListMode {
1227 #[default]
1228 Live,
1229 All,
1230}
1231
1232impl ProcessListMode {
1233 pub fn as_str(self) -> &'static str {
1234 match self {
1235 Self::Live => "live",
1236 Self::All => "all",
1237 }
1238 }
1239}
1240
1241#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1242pub struct ProcessStartGrant {
1243 pub session_scope: SessionScope,
1244 pub descriptor: ProcessHandleDescriptor,
1245}
1246
1247#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1248pub struct ProcessSessionDeleteReport {
1249 pub session_id: String,
1250 pub revoked_handle_count: usize,
1251 pub deleted_wake_count: usize,
1252 pub orphaned_process_ids: Vec<String>,
1253 pub preserved_process_ids: Vec<String>,
1254}
1255
1256#[cfg(test)]
1257mod tests {
1258 use serde_json::json;
1259
1260 use super::*;
1261
1262 fn process_value(component: &str, pos: usize, name: &str) -> serde_json::Value {
1263 json!({
1264 "component": component,
1265 "pos": pos,
1266 "name": name,
1267 })
1268 }
1269
1270 fn engine_entry(
1271 process_id: &str,
1272 definition: serde_json::Value,
1273 process_name: &str,
1274 status: ProcessStatus,
1275 ) -> ProcessHandleGrantEntry {
1276 let mut record = ProcessRecord::from_registration(
1277 ProcessRegistration::new(
1278 process_id,
1279 ProcessInput::Engine {
1280 kind: "test-engine".to_string(),
1281 payload: json!({
1282 "definition": definition.clone(),
1283 "label": process_name,
1284 }),
1285 },
1286 ProcessProvenance::host(),
1287 )
1288 .with_identity(
1289 ProcessIdentity::new("test-engine")
1290 .with_label(Some(process_name))
1291 .with_definition(Some(definition)),
1292 )
1293 .with_execution_env_ref(Some(ProcessExecutionEnvRef::new(format!(
1294 "process-env:test:{process_id}"
1295 )))),
1296 );
1297 record.status = status;
1298 (
1299 ProcessHandleGrant {
1300 session_id: "session".to_string(),
1301 process_id: process_id.to_string(),
1302 descriptor: ProcessHandleDescriptor::new(Some("test-engine"), Some(process_name)),
1303 },
1304 record,
1305 )
1306 }
1307
1308 #[test]
1309 fn process_list_filter_matches_definition_and_status() {
1310 let target_ref = process_value("target", 0, "target");
1311 let other_ref = process_value("other", 1, "other");
1312 let filter = ProcessListFilter::decode(&json!({
1313 "definition": target_ref,
1314 "status": "completed"
1315 }))
1316 .expect("decode filter");
1317
1318 let matching = engine_entry(
1319 "matching",
1320 target_ref,
1321 "target",
1322 ProcessStatus::Completed {
1323 await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1324 json!(true),
1325 )),
1326 },
1327 );
1328 let wrong_definition = engine_entry(
1329 "wrong-definition",
1330 other_ref,
1331 "other",
1332 ProcessStatus::Completed {
1333 await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1334 json!(true),
1335 )),
1336 },
1337 );
1338
1339 assert_eq!(filter.list_mode(), ProcessListMode::All);
1340 assert!(filter.matches_entry(&matching));
1341 assert!(!filter.matches_entry(&wrong_definition));
1342 }
1343
1344 #[test]
1345 fn process_list_filter_matches_waiting_facet() {
1346 let process_ref = process_value("target", 0, "target");
1347 let mut waiting_entry = engine_entry(
1348 "waiting",
1349 process_ref.clone(),
1350 "target",
1351 ProcessStatus::Running,
1352 );
1353 waiting_entry.1.wait = Some(WaitState {
1354 since_ms: 42,
1355 kind: WaitKind::Signal {
1356 name: "ready".to_string(),
1357 event_type: "signal.ready".to_string(),
1358 key: "process:waiting:signal.ready:1".to_string(),
1359 ordinal: 1,
1360 },
1361 });
1362 let idle_entry = engine_entry("idle", process_ref, "target", ProcessStatus::Running);
1363 let waiting_filter =
1364 ProcessListFilter::decode(&json!({ "waiting": true })).expect("decode waiting filter");
1365 let idle_filter =
1366 ProcessListFilter::decode(&json!({ "waiting": false })).expect("decode idle filter");
1367
1368 assert_eq!(waiting_filter.list_mode(), ProcessListMode::Live);
1369 assert!(waiting_filter.matches_entry(&waiting_entry));
1370 assert!(!waiting_filter.matches_entry(&idle_entry));
1371 assert!(!idle_filter.matches_entry(&waiting_entry));
1372 assert!(idle_filter.matches_entry(&idle_entry));
1373 assert!(
1374 ProcessListFilter::decode(&json!({ "waiting": "yes" }))
1375 .expect_err("invalid waiting filter")
1376 .contains("must be a boolean")
1377 );
1378 }
1379}