1use std::collections::BTreeMap;
2use std::fmt;
3use std::sync::Arc;
4use std::sync::Mutex;
5
6use serde::{Deserialize, Serialize};
7
8use super::events::{
9 ProcessAwaitOutput, ProcessEventType, ProcessTerminalSemantics, ProcessTerminalState,
10 default_process_event_types,
11};
12use super::validation::{
13 ensure_core_event_types, process_registration_hash, validate_process_registration,
14};
15
16pub type ProcessId = String;
17
18#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
19#[serde(transparent)]
20pub struct SessionScopeId(String);
21
22impl SessionScopeId {
23 pub fn new(value: impl Into<String>) -> Self {
24 Self(value.into())
25 }
26
27 pub fn as_str(&self) -> &str {
28 &self.0
29 }
30}
31
32impl fmt::Display for SessionScopeId {
33 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
34 formatter.write_str(&self.0)
35 }
36}
37
38impl From<String> for SessionScopeId {
39 fn from(value: String) -> Self {
40 Self::new(value)
41 }
42}
43
44impl From<&str> for SessionScopeId {
45 fn from(value: &str) -> Self {
46 Self::new(value)
47 }
48}
49
50#[derive(Debug, Serialize, Deserialize)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum ProcessInput {
61 ToolCall {
62 call: crate::PreparedToolCall,
63 },
64 Engine {
65 kind: String,
66 #[serde(default)]
67 payload: serde_json::Value,
68 },
69 SessionTurn {
70 create_request: Box<crate::SessionCreateRequest>,
71 turn_input: Box<crate::TurnInput>,
72 output_contract: crate::ToolOutputContract,
73 },
74 External {
75 #[serde(default)]
76 metadata: serde_json::Value,
77 },
78}
79
80impl Clone for ProcessInput {
81 fn clone(&self) -> Self {
82 match self {
83 Self::ToolCall { call } => Self::ToolCall { call: call.clone() },
84 Self::Engine { kind, payload } => Self::Engine {
85 kind: kind.clone(),
86 payload: payload.clone(),
87 },
88 Self::SessionTurn {
89 create_request,
90 turn_input,
91 output_contract,
92 } => Self::SessionTurn {
93 create_request: create_request.clone(),
94 turn_input: turn_input.clone(),
95 output_contract: output_contract.clone(),
96 },
97 Self::External { metadata } => Self::External {
98 metadata: metadata.clone(),
99 },
100 }
101 }
102}
103
104impl PartialEq for ProcessInput {
105 fn eq(&self, other: &Self) -> bool {
106 serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
107 }
108}
109
110impl ProcessInput {
111 pub fn engine_kind(&self) -> &'static str {
112 match self {
113 Self::ToolCall { .. } => "tool",
114 Self::Engine { .. } => "engine",
115 Self::SessionTurn { .. } => "session_turn",
116 Self::External { .. } => "external",
117 }
118 }
119
120 pub fn engine_specific_kind(&self) -> Option<&str> {
121 match self {
122 Self::Engine { kind, .. } => Some(kind.as_str()),
123 _ => None,
124 }
125 }
126}
127
128#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
129#[serde(transparent)]
130pub struct ProcessExecutionEnvRef(String);
131
132impl ProcessExecutionEnvRef {
133 pub fn new(value: impl Into<String>) -> Self {
134 Self(value.into())
135 }
136
137 pub fn as_str(&self) -> &str {
138 &self.0
139 }
140}
141
142impl fmt::Display for ProcessExecutionEnvRef {
143 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
144 formatter.write_str(&self.0)
145 }
146}
147
148#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
149#[serde(deny_unknown_fields)]
150pub struct ProcessExecutionEnvSpec {
151 #[serde(default)]
152 pub plugin_options: crate::PluginOptions,
153 #[serde(default)]
154 pub policy: crate::SessionPolicy,
155}
156
157impl ProcessExecutionEnvSpec {
158 pub fn new(plugin_options: crate::PluginOptions, policy: crate::SessionPolicy) -> Self {
159 Self {
160 plugin_options,
161 policy,
162 }
163 }
164
165 pub fn stable_ref(&self) -> Result<ProcessExecutionEnvRef, serde_json::Error> {
166 crate::stable_hash::stable_json_sha256_hex(self)
167 .map(|hash| ProcessExecutionEnvRef::new(format!("process-env:sha256:{hash}")))
168 }
169
170 pub fn to_store_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
171 serde_json::to_vec(self)
172 }
173
174 pub fn from_store_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
175 serde_json::from_slice(bytes)
176 }
177}
178
179#[async_trait::async_trait]
180pub trait ProcessExecutionEnvStore: Send + Sync {
181 fn durability_tier(&self) -> crate::DurabilityTier {
182 crate::DurabilityTier::Inline
183 }
184
185 async fn put_process_execution_env(
186 &self,
187 env_ref: &ProcessExecutionEnvRef,
188 bytes: &[u8],
189 ) -> Result<(), crate::PluginError>;
190
191 async fn get_process_execution_env(
192 &self,
193 env_ref: &ProcessExecutionEnvRef,
194 ) -> Result<Option<Vec<u8>>, crate::PluginError>;
195}
196
197#[derive(Default)]
198pub struct InMemoryProcessExecutionEnvStore {
199 envs: Mutex<BTreeMap<String, Vec<u8>>>,
200}
201
202impl InMemoryProcessExecutionEnvStore {
203 pub fn new() -> Self {
204 Self::default()
205 }
206}
207
208#[async_trait::async_trait]
209impl ProcessExecutionEnvStore for InMemoryProcessExecutionEnvStore {
210 async fn put_process_execution_env(
211 &self,
212 env_ref: &ProcessExecutionEnvRef,
213 bytes: &[u8],
214 ) -> Result<(), crate::PluginError> {
215 self.envs
216 .lock()
217 .map_err(|_| {
218 crate::PluginError::Session("process execution env store lock poisoned".to_string())
219 })?
220 .insert(env_ref.as_str().to_string(), bytes.to_vec());
221 Ok(())
222 }
223
224 async fn get_process_execution_env(
225 &self,
226 env_ref: &ProcessExecutionEnvRef,
227 ) -> Result<Option<Vec<u8>>, crate::PluginError> {
228 Ok(self
229 .envs
230 .lock()
231 .map_err(|_| {
232 crate::PluginError::Session("process execution env store lock poisoned".to_string())
233 })?
234 .get(env_ref.as_str())
235 .cloned())
236 }
237}
238
239pub async fn persist_process_execution_env(
240 env_store: &dyn ProcessExecutionEnvStore,
241 spec: &ProcessExecutionEnvSpec,
242) -> Result<ProcessExecutionEnvRef, crate::PluginError> {
243 let env_ref = spec.stable_ref().map_err(|err| {
244 crate::PluginError::Session(format!("failed to hash process execution env: {err}"))
245 })?;
246 let bytes = spec.to_store_bytes().map_err(|err| {
247 crate::PluginError::Session(format!("failed to encode process execution env: {err}"))
248 })?;
249 env_store
250 .put_process_execution_env(&env_ref, &bytes)
251 .await?;
252 Ok(env_ref)
253}
254
255pub async fn load_process_execution_env(
256 env_store: &dyn ProcessExecutionEnvStore,
257 env_ref: &ProcessExecutionEnvRef,
258) -> Result<ProcessExecutionEnvSpec, crate::PluginError> {
259 let bytes = env_store
260 .get_process_execution_env(env_ref)
261 .await?
262 .ok_or_else(|| {
263 crate::PluginError::Session(format!("missing process execution env `{env_ref}`"))
264 })?;
265 ProcessExecutionEnvSpec::from_store_bytes(&bytes).map_err(|err| {
266 crate::PluginError::Session(format!(
267 "failed to decode process execution env `{env_ref}`: {err}"
268 ))
269 })
270}
271
272#[derive(Clone, Debug, Default, Serialize, Deserialize)]
275pub struct ProcessExecutionContext {
276 #[serde(default, skip_serializing_if = "Option::is_none")]
277 pub causal_invocation: Option<crate::RuntimeInvocation>,
278}
279
280impl ProcessExecutionContext {
281 pub fn with_causal_invocation(mut self, invocation: Option<crate::RuntimeInvocation>) -> Self {
282 self.causal_invocation = invocation;
283 self
284 }
285
286 pub fn is_empty(&self) -> bool {
287 self.causal_invocation.is_none()
288 }
289}
290
291#[derive(Clone)]
292pub struct ProcessOpScope<'scope> {
293 pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
294 pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'scope>,
295 pub(crate) agent_frame_id: Option<crate::AgentFrameId>,
296 pub(crate) target_agent_frame_id: Option<crate::AgentFrameId>,
297}
298
299impl<'scope> ProcessOpScope<'scope> {
300 pub fn new(scoped_effect_controller: crate::ScopedEffectController<'scope>) -> Self {
301 Self {
302 parent_invocation: None,
303 effect_controller: crate::runtime::RuntimeEffectControllerHandle::borrowed(
304 scoped_effect_controller,
305 ),
306 agent_frame_id: None,
307 target_agent_frame_id: None,
308 }
309 }
310
311 pub fn with_parent_invocation(
312 mut self,
313 parent_invocation: Option<crate::RuntimeInvocation>,
314 ) -> Self {
315 self.parent_invocation = parent_invocation;
316 self
317 }
318
319 pub fn with_agent_frame_id(mut self, agent_frame_id: Option<crate::AgentFrameId>) -> Self {
320 self.agent_frame_id = agent_frame_id;
321 self
322 }
323
324 pub fn with_target_agent_frame_id(
325 mut self,
326 agent_frame_id: Option<crate::AgentFrameId>,
327 ) -> Self {
328 self.target_agent_frame_id = agent_frame_id;
329 self
330 }
331
332 pub fn agent_frame_id(&self) -> Option<&str> {
333 self.agent_frame_id.as_deref()
334 }
335
336 pub fn target_agent_frame_id(&self) -> Option<&str> {
337 self.target_agent_frame_id.as_deref()
338 }
339
340 pub(crate) fn controller(&self) -> &dyn crate::RuntimeEffectController {
341 self.effect_controller.controller()
342 }
343}
344
345#[derive(Clone, Debug, Default)]
346pub struct ProcessStartOptions {
347 pub descriptor: Option<ProcessHandleDescriptor>,
348 pub spawn_provenance: Option<ProcessSpawnProvenance>,
356}
357
358#[derive(Clone, Debug, PartialEq, Eq)]
363pub struct ProcessSpawnProvenance {
364 pub originator: ProcessOriginator,
365 pub wake_target: Option<SessionScope>,
366}
367
368impl ProcessStartOptions {
369 pub fn new() -> Self {
370 Self::default()
371 }
372
373 pub fn with_descriptor(mut self, descriptor: ProcessHandleDescriptor) -> Self {
374 self.descriptor = Some(descriptor);
375 self
376 }
377
378 pub fn with_optional_descriptor(mut self, descriptor: Option<ProcessHandleDescriptor>) -> Self {
379 self.descriptor = descriptor;
380 self
381 }
382
383 pub fn with_spawn_provenance(mut self, spawn_provenance: ProcessSpawnProvenance) -> Self {
384 self.spawn_provenance = Some(spawn_provenance);
385 self
386 }
387
388 pub fn execution_context(&self, scope: &ProcessOpScope<'_>) -> ProcessExecutionContext {
389 ProcessExecutionContext {
390 causal_invocation: scope.parent_invocation.clone(),
391 }
392 }
393}
394
395#[derive(Clone, Debug, Serialize, Deserialize)]
397pub struct ProcessStartRequest {
398 pub id: ProcessId,
399 pub input: ProcessInput,
400 #[serde(default, skip_serializing_if = "Option::is_none")]
401 pub env_spec: Option<ProcessExecutionEnvSpec>,
402 pub originator: ProcessOriginator,
403 #[serde(default, skip_serializing_if = "Option::is_none")]
404 pub wake_target: Option<SessionScope>,
405 #[serde(default, skip_serializing_if = "Option::is_none")]
406 pub grant: Option<ProcessStartGrant>,
407 #[serde(default)]
408 pub event_types: Vec<ProcessEventType>,
409}
410
411impl ProcessStartRequest {
412 pub fn new(
413 id: impl Into<ProcessId>,
414 input: ProcessInput,
415 originator: ProcessOriginator,
416 ) -> Self {
417 Self {
418 id: id.into(),
419 input,
420 env_spec: None,
421 originator,
422 wake_target: None,
423 grant: None,
424 event_types: default_process_event_types(),
425 }
426 }
427
428 pub fn external(
429 id: impl Into<ProcessId>,
430 originator: ProcessOriginator,
431 metadata: serde_json::Value,
432 ) -> Self {
433 Self::new(id, ProcessInput::External { metadata }, originator)
434 }
435
436 pub fn with_env_spec(mut self, env_spec: ProcessExecutionEnvSpec) -> Self {
437 self.env_spec = Some(env_spec);
438 self
439 }
440
441 pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
442 self.wake_target = wake_target;
443 self
444 }
445
446 pub fn with_grant(mut self, grant: Option<ProcessStartGrant>) -> Self {
447 self.grant = grant;
448 self
449 }
450
451 pub fn with_event_types(
452 mut self,
453 event_types: impl IntoIterator<Item = ProcessEventType>,
454 ) -> Self {
455 self.event_types = event_types.into_iter().collect();
456 self
457 }
458
459 pub fn with_extra_event_types(
460 mut self,
461 event_types: impl IntoIterator<Item = ProcessEventType>,
462 ) -> Self {
463 self.event_types.extend(event_types);
464 self
465 }
466
467 pub fn into_registration(self, env_ref: Option<ProcessExecutionEnvRef>) -> ProcessRegistration {
468 ProcessRegistration::new(self.id, self.input, ProcessProvenance::new(self.originator))
469 .with_event_types(self.event_types)
470 .with_execution_env_ref(env_ref)
471 .with_wake_target(self.wake_target)
472 }
473}
474
475#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
476pub struct SessionScope {
477 pub session_id: String,
478 #[serde(default, skip_serializing_if = "Option::is_none")]
479 pub agent_frame_id: Option<crate::AgentFrameId>,
480}
481
482#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
483pub struct ProcessProvenance {
484 pub originator: ProcessOriginator,
485 #[serde(default, skip_serializing_if = "Option::is_none")]
486 pub caused_by: Option<crate::CausalRef>,
487}
488
489impl ProcessProvenance {
490 pub fn new(originator: ProcessOriginator) -> Self {
491 Self {
492 originator,
493 caused_by: None,
494 }
495 }
496
497 pub fn host() -> Self {
498 Self::new(ProcessOriginator::host())
499 }
500
501 pub fn session(scope: SessionScope) -> Self {
502 Self::new(ProcessOriginator::session(scope))
503 }
504
505 pub fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
506 self.caused_by = caused_by;
507 self
508 }
509}
510
511#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
512#[serde(tag = "type", rename_all = "snake_case")]
513pub enum ProcessOriginator {
514 Host,
515 Session { scope: SessionScope },
516}
517
518impl ProcessOriginator {
519 pub fn host() -> Self {
520 Self::Host
521 }
522
523 pub fn session(scope: SessionScope) -> Self {
524 Self::Session { scope }
525 }
526
527 pub fn scope_id(&self) -> String {
528 match self {
529 Self::Host => "host".to_string(),
530 Self::Session { scope } => scope.id().to_string(),
531 }
532 }
533}
534
535impl SessionScope {
536 pub fn new(session_id: impl Into<String>) -> Self {
537 Self {
538 session_id: session_id.into(),
539 agent_frame_id: None,
540 }
541 }
542
543 pub fn for_agent_frame(
544 session_id: impl Into<String>,
545 agent_frame_id: impl Into<crate::AgentFrameId>,
546 ) -> Self {
547 Self {
548 session_id: session_id.into(),
549 agent_frame_id: Some(agent_frame_id.into()),
550 }
551 }
552
553 pub fn id(&self) -> SessionScopeId {
554 match self.agent_frame_id.as_deref() {
555 Some(frame_id) if !frame_id.is_empty() => {
556 SessionScopeId::new(format!("session:{}/frame:{frame_id}", self.session_id))
557 }
558 _ => SessionScopeId::new(format!("session:{}", self.session_id)),
559 }
560 }
561
562 pub fn is_empty(&self) -> bool {
563 self.session_id.is_empty()
564 }
565}
566
567#[derive(Debug, Serialize, Deserialize)]
569pub struct ProcessRegistration {
570 pub id: ProcessId,
571 pub input: Arc<ProcessInput>,
572 pub identity: ProcessIdentity,
573 #[serde(default)]
574 pub event_types: Vec<ProcessEventType>,
575 pub provenance: ProcessProvenance,
576 #[serde(default, skip_serializing_if = "Option::is_none")]
577 pub env_ref: Option<ProcessExecutionEnvRef>,
578 #[serde(default, skip_serializing_if = "Option::is_none")]
579 pub wake_target: Option<SessionScope>,
580}
581
582impl Clone for ProcessRegistration {
583 fn clone(&self) -> Self {
584 Self {
585 id: self.id.clone(),
586 input: Arc::clone(&self.input),
587 identity: self.identity.clone(),
588 event_types: self.event_types.clone(),
589 provenance: self.provenance.clone(),
590 env_ref: self.env_ref.clone(),
591 wake_target: self.wake_target.clone(),
592 }
593 }
594}
595
596impl ProcessRegistration {
597 pub fn new(
598 id: impl Into<ProcessId>,
599 input: ProcessInput,
600 provenance: ProcessProvenance,
601 ) -> Self {
602 let identity = ProcessIdentity::from_process_input(&input);
603 Self {
604 id: id.into(),
605 input: Arc::new(input),
606 identity,
607 event_types: default_process_event_types(),
608 provenance,
609 env_ref: None,
610 wake_target: None,
611 }
612 }
613
614 pub(crate) fn session_start_draft(id: impl Into<ProcessId>, input: ProcessInput) -> Self {
615 Self::new(id, input, ProcessProvenance::host())
616 }
617
618 pub fn with_process_provenance(mut self, provenance: ProcessProvenance) -> Self {
619 self.provenance = provenance;
620 self
621 }
622
623 pub fn with_execution_env_ref(mut self, env_ref: Option<ProcessExecutionEnvRef>) -> Self {
624 self.env_ref = env_ref;
625 self
626 }
627
628 pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
629 self.wake_target = wake_target;
630 self
631 }
632
633 pub fn with_identity(mut self, identity: ProcessIdentity) -> Self {
634 self.identity = identity;
635 self
636 }
637
638 pub fn with_event_types(
639 mut self,
640 event_types: impl IntoIterator<Item = ProcessEventType>,
641 ) -> Self {
642 self.event_types = event_types.into_iter().collect();
643 self
644 }
645
646 pub fn with_extra_event_types(
647 mut self,
648 event_types: impl IntoIterator<Item = ProcessEventType>,
649 ) -> Self {
650 self.event_types.extend(event_types);
651 self
652 }
653}
654
655#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
656#[serde(tag = "state", rename_all = "snake_case")]
657pub enum ProcessStatus {
658 #[default]
659 Running,
660 Completed {
661 await_output: ProcessAwaitOutput,
662 },
663 Failed {
664 await_output: ProcessAwaitOutput,
665 },
666 Cancelled {
667 await_output: ProcessAwaitOutput,
668 },
669}
670
671impl ProcessStatus {
672 pub fn from_terminal(terminal: ProcessTerminalSemantics) -> Self {
673 match terminal.state {
674 ProcessTerminalState::Completed => Self::Completed {
675 await_output: terminal.await_output,
676 },
677 ProcessTerminalState::Failed => Self::Failed {
678 await_output: terminal.await_output,
679 },
680 ProcessTerminalState::Cancelled => Self::Cancelled {
681 await_output: terminal.await_output,
682 },
683 }
684 }
685
686 pub fn is_terminal(&self) -> bool {
687 !matches!(self, Self::Running)
688 }
689
690 pub fn label(&self) -> &'static str {
691 match self {
692 Self::Running => "running",
693 Self::Completed { .. } => "completed",
694 Self::Failed { .. } => "failed",
695 Self::Cancelled { .. } => "cancelled",
696 }
697 }
698
699 pub fn terminal_state(&self) -> Option<ProcessTerminalState> {
700 match self {
701 Self::Running => None,
702 Self::Completed { .. } => Some(ProcessTerminalState::Completed),
703 Self::Failed { .. } => Some(ProcessTerminalState::Failed),
704 Self::Cancelled { .. } => Some(ProcessTerminalState::Cancelled),
705 }
706 }
707
708 pub fn await_output(&self) -> Option<&ProcessAwaitOutput> {
709 match self {
710 Self::Running => None,
711 Self::Completed { await_output }
712 | Self::Failed { await_output }
713 | Self::Cancelled { await_output } => Some(await_output),
714 }
715 }
716
717 pub fn terminal_semantics(&self) -> Option<ProcessTerminalSemantics> {
718 Some(ProcessTerminalSemantics {
719 state: self.terminal_state()?,
720 await_output: self.await_output()?.clone(),
721 })
722 }
723}
724
725#[derive(Clone, Debug, Serialize, Deserialize)]
728pub struct ProcessRecord {
729 pub id: ProcessId,
730 pub registration_hash: String,
731 pub input: Arc<ProcessInput>,
732 pub identity: ProcessIdentity,
733 #[serde(default)]
734 pub event_types: Vec<ProcessEventType>,
735 pub provenance: ProcessProvenance,
736 #[serde(default, skip_serializing_if = "Option::is_none")]
737 pub env_ref: Option<ProcessExecutionEnvRef>,
738 #[serde(default, skip_serializing_if = "Option::is_none")]
739 pub wake_target: Option<SessionScope>,
740 #[serde(default)]
741 pub created_at_ms: u64,
742 #[serde(default)]
743 pub updated_at_ms: u64,
744 #[serde(default, skip_serializing_if = "Option::is_none")]
745 pub external_ref: Option<ProcessExternalRef>,
746 #[serde(default, skip_serializing_if = "Option::is_none")]
747 pub wait: Option<WaitState>,
748 #[serde(default)]
749 pub status: ProcessStatus,
750}
751
752#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
753pub struct WaitState {
754 pub kind: WaitKind,
755 pub since_ms: u64,
756}
757
758#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
759#[serde(tag = "kind", rename_all = "snake_case")]
760pub enum WaitKind {
761 Signal {
762 name: String,
763 event_type: String,
764 key: String,
765 ordinal: u64,
766 },
767}
768
769impl ProcessRecord {
770 pub fn from_registration(registration: ProcessRegistration) -> Self {
771 Self::from_registration_with_clock(registration, &crate::SystemClock)
772 }
773
774 pub fn from_registration_with_clock(
775 mut registration: ProcessRegistration,
776 clock: &dyn crate::Clock,
777 ) -> Self {
778 ensure_core_event_types(&mut registration);
779 validate_process_registration(®istration)
780 .expect("process registration should be valid before record construction");
781 let registration_hash = process_registration_hash(®istration)
782 .expect("process registration should hash before record construction");
783 Self::from_prepared_registration(registration, registration_hash, clock.timestamp_ms())
784 }
785
786 pub fn from_prepared_registration(
787 registration: ProcessRegistration,
788 registration_hash: String,
789 now_ms: u64,
790 ) -> Self {
791 Self {
792 id: registration.id,
793 registration_hash,
794 input: registration.input,
795 identity: registration.identity,
796 event_types: registration.event_types,
797 provenance: registration.provenance,
798 env_ref: registration.env_ref,
799 wake_target: registration.wake_target,
800 created_at_ms: now_ms,
801 updated_at_ms: now_ms,
802 external_ref: None,
803 wait: None,
804 status: ProcessStatus::Running,
805 }
806 }
807
808 pub fn is_terminal(&self) -> bool {
809 self.status.is_terminal()
810 }
811
812 pub fn clear_wake_target_for_session(&mut self, session_id: &str) -> bool {
813 self.clear_wake_target_for_session_with_clock(session_id, &crate::SystemClock)
814 }
815
816 pub fn clear_wake_target_for_session_with_clock(
817 &mut self,
818 session_id: &str,
819 clock: &dyn crate::Clock,
820 ) -> bool {
821 let should_clear = self
822 .wake_target
823 .as_ref()
824 .is_some_and(|scope| scope.session_id == session_id);
825 if should_clear {
826 self.wake_target = None;
827 self.updated_at_ms = clock.timestamp_ms();
828 }
829 should_clear
830 }
831
832 pub fn originator_scope_id(&self) -> String {
833 self.provenance.originator.scope_id()
834 }
835}
836
837#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
844pub struct ProcessIdentity {
845 pub kind: String,
846 #[serde(default, skip_serializing_if = "Option::is_none")]
847 pub label: Option<String>,
848 #[serde(default, skip_serializing_if = "Option::is_none")]
849 pub definition: Option<serde_json::Value>,
850}
851
852impl ProcessIdentity {
853 pub fn new(kind: impl Into<String>) -> Self {
854 Self {
855 kind: kind.into(),
856 label: None,
857 definition: None,
858 }
859 }
860
861 pub fn with_label(mut self, label: Option<impl Into<String>>) -> Self {
862 self.label = label.map(Into::into);
863 self
864 }
865
866 pub fn with_definition(mut self, definition: Option<serde_json::Value>) -> Self {
867 self.definition = definition;
868 self
869 }
870
871 pub fn from_process_input(input: &ProcessInput) -> Self {
872 match input {
873 ProcessInput::ToolCall { call } => {
874 Self::new("tool").with_label(Some(call.tool_name.clone()))
875 }
876 ProcessInput::Engine { kind, .. } => Self::new(kind.clone()),
877 ProcessInput::SessionTurn { create_request, .. } => {
878 let label = create_request
879 .subagent
880 .as_ref()
881 .map(|subagent| subagent.capability.clone())
882 .or_else(|| create_request.usage_source.clone())
883 .or_else(|| create_request.session_id.clone());
884 Self::new("session_turn").with_label(label)
885 }
886 ProcessInput::External { metadata } => {
887 let label = metadata
888 .get("label")
889 .or_else(|| metadata.get("name"))
890 .or_else(|| metadata.get("title"))
891 .and_then(serde_json::Value::as_str)
892 .map(str::to_string);
893 Self::new("external").with_label(label)
894 }
895 }
896 }
897}
898
899pub const PROCESS_LEASE_SCHEMA_VERSION: u32 = 1;
904
905#[derive(Clone, Debug, Serialize, Deserialize)]
919pub struct ProcessLease {
920 pub schema_version: u32,
921 pub process_id: ProcessId,
922 pub owner_id: String,
923 pub lease_token: String,
924 pub fencing_token: u64,
925 pub claimed_at_epoch_ms: u64,
926 pub expires_at_epoch_ms: u64,
927}
928
929#[derive(Clone, Debug, Serialize, Deserialize)]
930pub struct ProcessLeaseCompletion {
931 pub process_id: ProcessId,
932 pub lease_token: String,
933}
934
935impl ProcessLeaseCompletion {
936 pub fn from_lease(lease: &ProcessLease) -> Self {
937 Self {
938 process_id: lease.process_id.clone(),
939 lease_token: lease.lease_token.clone(),
940 }
941 }
942}
943
944#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
946pub struct ProcessExternalRef {
947 pub backend: String,
948 pub id: String,
949 #[serde(default, skip_serializing_if = "Option::is_none")]
950 pub metadata: Option<serde_json::Value>,
951}
952
953#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
954pub struct ProcessHandleDescriptor {
955 #[serde(default, skip_serializing_if = "Option::is_none")]
956 pub kind: Option<String>,
957 #[serde(default, skip_serializing_if = "Option::is_none")]
958 pub label: Option<String>,
959}
960
961impl ProcessHandleDescriptor {
962 pub fn new(kind: Option<impl Into<String>>, label: Option<impl Into<String>>) -> Self {
963 Self {
964 kind: kind.map(Into::into),
965 label: label.map(Into::into),
966 }
967 }
968}
969
970#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
971pub struct ProcessHandleGrant {
972 pub session_id: String,
973 pub process_id: ProcessId,
974 pub descriptor: ProcessHandleDescriptor,
975}
976
977pub type ProcessHandleGrantEntry = (ProcessHandleGrant, ProcessRecord);
978
979#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
980#[serde(rename_all = "snake_case")]
981pub enum ProcessLifecycleStatus {
982 #[default]
983 Running,
984 Completed,
985 Failed,
986 Cancelled,
987}
988
989impl ProcessLifecycleStatus {
990 pub fn label(self) -> &'static str {
991 match self {
992 Self::Running => "running",
993 Self::Completed => "completed",
994 Self::Failed => "failed",
995 Self::Cancelled => "cancelled",
996 }
997 }
998
999 pub fn is_terminal(self) -> bool {
1000 !matches!(self, Self::Running)
1001 }
1002
1003 pub fn terminal_state(self) -> Option<ProcessTerminalState> {
1004 match self {
1005 Self::Running => None,
1006 Self::Completed => Some(ProcessTerminalState::Completed),
1007 Self::Failed => Some(ProcessTerminalState::Failed),
1008 Self::Cancelled => Some(ProcessTerminalState::Cancelled),
1009 }
1010 }
1011}
1012
1013impl From<&ProcessStatus> for ProcessLifecycleStatus {
1014 fn from(status: &ProcessStatus) -> Self {
1015 match status {
1016 ProcessStatus::Running => Self::Running,
1017 ProcessStatus::Completed { .. } => Self::Completed,
1018 ProcessStatus::Failed { .. } => Self::Failed,
1019 ProcessStatus::Cancelled { .. } => Self::Cancelled,
1020 }
1021 }
1022}
1023
1024impl From<ProcessStatus> for ProcessLifecycleStatus {
1025 fn from(status: ProcessStatus) -> Self {
1026 Self::from(&status)
1027 }
1028}
1029
1030#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1031pub struct ProcessHandleSummary {
1032 #[serde(rename = "__handle__")]
1033 pub handle_type: String,
1034 pub id: ProcessId,
1035 pub process_id: ProcessId,
1036 pub descriptor: ProcessHandleDescriptor,
1037 #[serde(default, skip_serializing_if = "Option::is_none")]
1038 pub definition: Option<serde_json::Value>,
1039 pub status: ProcessLifecycleStatus,
1040}
1041
1042impl ProcessHandleSummary {
1043 pub fn new(
1044 process_id: impl Into<ProcessId>,
1045 descriptor: ProcessHandleDescriptor,
1046 status: ProcessLifecycleStatus,
1047 ) -> Self {
1048 let process_id = process_id.into();
1049 Self {
1050 handle_type: "process".to_string(),
1051 id: process_id.clone(),
1052 process_id,
1053 descriptor,
1054 definition: None,
1055 status,
1056 }
1057 }
1058
1059 pub fn with_definition(mut self, definition: Option<serde_json::Value>) -> Self {
1060 self.definition = definition;
1061 self
1062 }
1063
1064 pub fn from_grant_record(grant: ProcessHandleGrant, record: ProcessRecord) -> Self {
1065 let definition = record.identity.definition.clone();
1066 Self::new(
1067 record.id,
1068 grant.descriptor,
1069 ProcessLifecycleStatus::from(record.status),
1070 )
1071 .with_definition(definition)
1072 }
1073}
1074
1075impl From<ProcessHandleGrantEntry> for ProcessHandleSummary {
1076 fn from((grant, record): ProcessHandleGrantEntry) -> Self {
1077 Self::from_grant_record(grant, record)
1078 }
1079}
1080
1081#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1082pub struct ProcessCancelSummary {
1083 pub process_id: ProcessId,
1084 pub status: ProcessLifecycleStatus,
1085}
1086
1087impl ProcessCancelSummary {
1088 pub fn from_record(record: ProcessRecord) -> Self {
1089 Self {
1090 process_id: record.id,
1091 status: ProcessLifecycleStatus::from(record.status),
1092 }
1093 }
1094}
1095
1096#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
1097pub enum ProcessStatusFilter {
1098 #[default]
1099 Running,
1100 Completed,
1101 Failed,
1102 Cancelled,
1103 Any,
1104}
1105
1106impl ProcessStatusFilter {
1107 pub fn decode(value: Option<&str>) -> Result<Self, String> {
1108 match value.unwrap_or("running") {
1109 "running" => Ok(Self::Running),
1110 "completed" => Ok(Self::Completed),
1111 "failed" => Ok(Self::Failed),
1112 "cancelled" => Ok(Self::Cancelled),
1113 "any" => Ok(Self::Any),
1114 other => Err(format!(
1115 "processes.list status must be `running`, `completed`, `failed`, `cancelled`, or `any`, got `{other}`"
1116 )),
1117 }
1118 }
1119
1120 pub fn list_mode(self) -> ProcessListMode {
1121 match self {
1122 Self::Running => ProcessListMode::Live,
1123 Self::Completed | Self::Failed | Self::Cancelled | Self::Any => ProcessListMode::All,
1124 }
1125 }
1126
1127 pub fn matches(self, status: ProcessLifecycleStatus) -> bool {
1128 match self {
1129 Self::Running => status == ProcessLifecycleStatus::Running,
1130 Self::Completed => status == ProcessLifecycleStatus::Completed,
1131 Self::Failed => status == ProcessLifecycleStatus::Failed,
1132 Self::Cancelled => status == ProcessLifecycleStatus::Cancelled,
1133 Self::Any => true,
1134 }
1135 }
1136}
1137
1138#[derive(Clone, Debug, Default, PartialEq)]
1139pub struct ProcessListFilter {
1140 pub definition: Option<serde_json::Value>,
1141 pub status: ProcessStatusFilter,
1142 pub waiting: Option<bool>,
1143}
1144
1145impl ProcessListFilter {
1146 pub fn decode(args: &serde_json::Value) -> Result<Self, String> {
1147 let map = args
1148 .as_object()
1149 .ok_or_else(|| "processes.list expects a record of process filters".to_string())?;
1150 for key in map.keys() {
1151 match key.as_str() {
1152 "definition" | "status" | "waiting" => {}
1153 _ => return Err(format!("processes.list unknown filter `{key}`")),
1154 }
1155 }
1156 let definition = args.get("definition").cloned();
1157 let status =
1158 ProcessStatusFilter::decode(args.get("status").and_then(serde_json::Value::as_str))?;
1159 let waiting = args
1160 .get("waiting")
1161 .map(|value| {
1162 value
1163 .as_bool()
1164 .ok_or_else(|| "processes.list `waiting` filter must be a boolean".to_string())
1165 })
1166 .transpose()?;
1167 Ok(Self {
1168 definition,
1169 status,
1170 waiting,
1171 })
1172 }
1173
1174 pub fn list_mode(&self) -> ProcessListMode {
1175 self.status.list_mode()
1176 }
1177
1178 pub fn matches_entry(&self, entry: &ProcessHandleGrantEntry) -> bool {
1179 let (_grant, record) = entry;
1180 self.matches_record(record)
1181 }
1182
1183 pub fn matches_record(&self, record: &ProcessRecord) -> bool {
1184 let status = ProcessLifecycleStatus::from(&record.status);
1185 self.status.matches(status)
1186 && self
1187 .definition
1188 .as_ref()
1189 .is_none_or(|definition| record.identity.definition.as_ref() == Some(definition))
1190 && self
1191 .waiting
1192 .is_none_or(|waiting| record.wait.is_some() == waiting)
1193 }
1194}
1195
1196#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1197#[serde(rename_all = "snake_case")]
1198pub enum ProcessListMode {
1199 #[default]
1200 Live,
1201 All,
1202}
1203
1204impl ProcessListMode {
1205 pub fn as_str(self) -> &'static str {
1206 match self {
1207 Self::Live => "live",
1208 Self::All => "all",
1209 }
1210 }
1211}
1212
1213#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1214pub struct ProcessStartGrant {
1215 pub session_scope: SessionScope,
1216 pub descriptor: ProcessHandleDescriptor,
1217}
1218
1219#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1220pub struct ProcessSessionDeleteReport {
1221 pub session_id: String,
1222 pub revoked_handle_count: usize,
1223 pub deleted_wake_count: usize,
1224 pub orphaned_process_ids: Vec<String>,
1225 pub preserved_process_ids: Vec<String>,
1226}
1227
1228#[cfg(test)]
1229mod tests {
1230 use serde_json::json;
1231
1232 use super::*;
1233
1234 fn process_value(component: &str, pos: usize, name: &str) -> serde_json::Value {
1235 json!({
1236 "component": component,
1237 "pos": pos,
1238 "name": name,
1239 })
1240 }
1241
1242 fn engine_entry(
1243 process_id: &str,
1244 definition: serde_json::Value,
1245 process_name: &str,
1246 status: ProcessStatus,
1247 ) -> ProcessHandleGrantEntry {
1248 let mut record = ProcessRecord::from_registration(
1249 ProcessRegistration::new(
1250 process_id,
1251 ProcessInput::Engine {
1252 kind: "test-engine".to_string(),
1253 payload: json!({
1254 "definition": definition.clone(),
1255 "label": process_name,
1256 }),
1257 },
1258 ProcessProvenance::host(),
1259 )
1260 .with_identity(
1261 ProcessIdentity::new("test-engine")
1262 .with_label(Some(process_name))
1263 .with_definition(Some(definition)),
1264 )
1265 .with_execution_env_ref(Some(ProcessExecutionEnvRef::new(format!(
1266 "process-env:test:{process_id}"
1267 )))),
1268 );
1269 record.status = status;
1270 (
1271 ProcessHandleGrant {
1272 session_id: "session".to_string(),
1273 process_id: process_id.to_string(),
1274 descriptor: ProcessHandleDescriptor::new(Some("test-engine"), Some(process_name)),
1275 },
1276 record,
1277 )
1278 }
1279
1280 #[test]
1281 fn process_list_filter_matches_definition_and_status() {
1282 let target_ref = process_value("target", 0, "target");
1283 let other_ref = process_value("other", 1, "other");
1284 let filter = ProcessListFilter::decode(&json!({
1285 "definition": target_ref,
1286 "status": "completed"
1287 }))
1288 .expect("decode filter");
1289
1290 let matching = engine_entry(
1291 "matching",
1292 target_ref,
1293 "target",
1294 ProcessStatus::Completed {
1295 await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1296 json!(true),
1297 )),
1298 },
1299 );
1300 let wrong_definition = engine_entry(
1301 "wrong-definition",
1302 other_ref,
1303 "other",
1304 ProcessStatus::Completed {
1305 await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1306 json!(true),
1307 )),
1308 },
1309 );
1310
1311 assert_eq!(filter.list_mode(), ProcessListMode::All);
1312 assert!(filter.matches_entry(&matching));
1313 assert!(!filter.matches_entry(&wrong_definition));
1314 }
1315
1316 #[test]
1317 fn process_list_filter_matches_waiting_facet() {
1318 let process_ref = process_value("target", 0, "target");
1319 let mut waiting_entry = engine_entry(
1320 "waiting",
1321 process_ref.clone(),
1322 "target",
1323 ProcessStatus::Running,
1324 );
1325 waiting_entry.1.wait = Some(WaitState {
1326 since_ms: 42,
1327 kind: WaitKind::Signal {
1328 name: "ready".to_string(),
1329 event_type: "signal.ready".to_string(),
1330 key: "process:waiting:signal.ready:1".to_string(),
1331 ordinal: 1,
1332 },
1333 });
1334 let idle_entry = engine_entry("idle", process_ref, "target", ProcessStatus::Running);
1335 let waiting_filter =
1336 ProcessListFilter::decode(&json!({ "waiting": true })).expect("decode waiting filter");
1337 let idle_filter =
1338 ProcessListFilter::decode(&json!({ "waiting": false })).expect("decode idle filter");
1339
1340 assert_eq!(waiting_filter.list_mode(), ProcessListMode::Live);
1341 assert!(waiting_filter.matches_entry(&waiting_entry));
1342 assert!(!waiting_filter.matches_entry(&idle_entry));
1343 assert!(!idle_filter.matches_entry(&waiting_entry));
1344 assert!(idle_filter.matches_entry(&idle_entry));
1345 assert!(
1346 ProcessListFilter::decode(&json!({ "waiting": "yes" }))
1347 .expect_err("invalid waiting filter")
1348 .contains("must be a boolean")
1349 );
1350 }
1351}