1use std::fmt;
2use std::sync::Arc;
3
4use serde::{Deserialize, Serialize};
5
6use super::events::{
7 ProcessAwaitOutput, ProcessEventType, ProcessTerminalSemantics, ProcessTerminalState,
8 default_process_event_types,
9};
10use super::time::current_epoch_ms;
11use super::validation::{
12 ensure_core_event_types, process_registration_hash, validate_process_registration,
13};
14
15pub type ProcessId = String;
16
17#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
18#[serde(transparent)]
19pub struct SessionScopeId(String);
20
21impl SessionScopeId {
22 pub fn new(value: impl Into<String>) -> Self {
23 Self(value.into())
24 }
25
26 pub fn as_str(&self) -> &str {
27 &self.0
28 }
29}
30
31impl fmt::Display for SessionScopeId {
32 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
33 formatter.write_str(&self.0)
34 }
35}
36
37impl From<String> for SessionScopeId {
38 fn from(value: String) -> Self {
39 Self::new(value)
40 }
41}
42
43impl From<&str> for SessionScopeId {
44 fn from(value: &str) -> Self {
45 Self::new(value)
46 }
47}
48
49#[derive(Debug, Serialize, Deserialize)]
51#[serde(tag = "type", rename_all = "snake_case")]
52pub enum ProcessInput {
53 ToolCall {
54 call: crate::PreparedToolCall,
55 },
56 LashlangProcess {
57 module_ref: lashlang::ModuleRef,
58 process_ref: lashlang::ProcessRef,
59 required_surface_ref: lashlang::RequiredSurfaceRef,
60 process_name: String,
61 #[serde(default)]
62 args: serde_json::Map<String, serde_json::Value>,
63 },
64 SessionTurn {
65 create_request: Box<crate::SessionCreateRequest>,
66 turn_input: Box<crate::TurnInput>,
67 output_contract: crate::ToolOutputContract,
68 },
69 External {
70 #[serde(default)]
71 metadata: serde_json::Value,
72 },
73}
74
75impl Clone for ProcessInput {
76 fn clone(&self) -> Self {
77 match self {
78 Self::ToolCall { call } => Self::ToolCall { call: call.clone() },
79 Self::LashlangProcess {
80 module_ref,
81 process_ref,
82 required_surface_ref,
83 process_name,
84 args,
85 } => Self::LashlangProcess {
86 module_ref: module_ref.clone(),
87 process_ref: process_ref.clone(),
88 required_surface_ref: required_surface_ref.clone(),
89 process_name: process_name.clone(),
90 args: args.clone(),
91 },
92 Self::SessionTurn {
93 create_request,
94 turn_input,
95 output_contract,
96 } => Self::SessionTurn {
97 create_request: create_request.clone(),
98 turn_input: turn_input.clone(),
99 output_contract: output_contract.clone(),
100 },
101 Self::External { metadata } => Self::External {
102 metadata: metadata.clone(),
103 },
104 }
105 }
106}
107
108#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
109#[serde(transparent)]
110pub struct ProcessExecutionEnvRef(String);
111
112impl ProcessExecutionEnvRef {
113 pub fn new(value: impl Into<String>) -> Self {
114 Self(value.into())
115 }
116
117 pub fn as_str(&self) -> &str {
118 &self.0
119 }
120}
121
122impl fmt::Display for ProcessExecutionEnvRef {
123 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
124 formatter.write_str(&self.0)
125 }
126}
127
128#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
129pub struct ProcessExecutionEnvSpec {
130 #[serde(default)]
131 pub plugin_options: crate::PluginOptions,
132 #[serde(default)]
133 pub policy: crate::SessionPolicy,
134}
135
136impl ProcessExecutionEnvSpec {
137 pub fn new(plugin_options: crate::PluginOptions, policy: crate::SessionPolicy) -> Self {
138 Self {
139 plugin_options,
140 policy,
141 }
142 }
143
144 pub fn stable_ref(&self) -> Result<ProcessExecutionEnvRef, serde_json::Error> {
145 crate::stable_hash::stable_json_sha256_hex(self)
146 .map(|hash| ProcessExecutionEnvRef::new(format!("process-env:sha256:{hash}")))
147 }
148
149 pub fn to_store_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
150 serde_json::to_vec(self)
151 }
152
153 pub fn from_store_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
154 serde_json::from_slice(bytes)
155 }
156}
157
158pub async fn persist_process_execution_env(
159 artifact_store: &dyn lashlang::LashlangArtifactStore,
160 spec: &ProcessExecutionEnvSpec,
161) -> Result<ProcessExecutionEnvRef, crate::PluginError> {
162 let env_ref = spec.stable_ref().map_err(|err| {
163 crate::PluginError::Session(format!("failed to hash process execution env: {err}"))
164 })?;
165 let bytes = spec.to_store_bytes().map_err(|err| {
166 crate::PluginError::Session(format!("failed to encode process execution env: {err}"))
167 })?;
168 artifact_store
169 .put_artifact_bytes(env_ref.as_str(), "process_execution_env", &bytes)
170 .await
171 .map_err(|err| {
172 crate::PluginError::Session(format!(
173 "failed to persist process execution env `{env_ref}`: {err}"
174 ))
175 })?;
176 Ok(env_ref)
177}
178
179pub async fn load_process_execution_env(
180 artifact_store: &dyn lashlang::LashlangArtifactStore,
181 env_ref: &ProcessExecutionEnvRef,
182) -> Result<ProcessExecutionEnvSpec, crate::PluginError> {
183 let bytes = artifact_store
184 .get_artifact_bytes(env_ref.as_str())
185 .await
186 .map_err(|err| {
187 crate::PluginError::Session(format!(
188 "failed to load process execution env `{env_ref}`: {err}"
189 ))
190 })?
191 .ok_or_else(|| {
192 crate::PluginError::Session(format!("missing process execution env `{env_ref}`"))
193 })?;
194 ProcessExecutionEnvSpec::from_store_bytes(&bytes).map_err(|err| {
195 crate::PluginError::Session(format!(
196 "failed to decode process execution env `{env_ref}`: {err}"
197 ))
198 })
199}
200
201#[derive(Clone, Debug, Default, Serialize, Deserialize)]
204pub struct ProcessExecutionContext {
205 #[serde(default, skip_serializing_if = "Option::is_none")]
206 pub causal_invocation: Option<crate::RuntimeInvocation>,
207}
208
209impl ProcessExecutionContext {
210 pub fn with_causal_invocation(mut self, invocation: Option<crate::RuntimeInvocation>) -> Self {
211 self.causal_invocation = invocation;
212 self
213 }
214
215 pub fn is_empty(&self) -> bool {
216 self.causal_invocation.is_none()
217 }
218}
219
220#[derive(Clone)]
221pub struct ProcessOpScope<'scope> {
222 pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
223 pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'scope>,
224 pub(crate) agent_frame_id: Option<crate::AgentFrameId>,
225 pub(crate) target_agent_frame_id: Option<crate::AgentFrameId>,
226}
227
228impl<'scope> ProcessOpScope<'scope> {
229 pub fn new(scoped_effect_controller: crate::ScopedEffectController<'scope>) -> Self {
230 Self {
231 parent_invocation: None,
232 effect_controller: crate::runtime::RuntimeEffectControllerHandle::borrowed(
233 scoped_effect_controller,
234 ),
235 agent_frame_id: None,
236 target_agent_frame_id: None,
237 }
238 }
239
240 pub fn with_parent_invocation(
241 mut self,
242 parent_invocation: Option<crate::RuntimeInvocation>,
243 ) -> Self {
244 self.parent_invocation = parent_invocation;
245 self
246 }
247
248 pub fn with_agent_frame_id(mut self, agent_frame_id: Option<crate::AgentFrameId>) -> Self {
249 self.agent_frame_id = agent_frame_id;
250 self
251 }
252
253 pub fn with_target_agent_frame_id(
254 mut self,
255 agent_frame_id: Option<crate::AgentFrameId>,
256 ) -> Self {
257 self.target_agent_frame_id = agent_frame_id;
258 self
259 }
260
261 pub fn agent_frame_id(&self) -> Option<&str> {
262 self.agent_frame_id.as_deref()
263 }
264
265 pub fn target_agent_frame_id(&self) -> Option<&str> {
266 self.target_agent_frame_id.as_deref()
267 }
268
269 pub(crate) fn controller(&self) -> &dyn crate::RuntimeEffectController {
270 self.effect_controller.controller()
271 }
272}
273
274#[derive(Clone, Debug, Default)]
275pub struct ProcessStartOptions {
276 pub descriptor: Option<ProcessHandleDescriptor>,
277 pub spawn_provenance: Option<ProcessSpawnProvenance>,
285}
286
287#[derive(Clone, Debug, PartialEq, Eq)]
292pub struct ProcessSpawnProvenance {
293 pub originator: ProcessOriginator,
294 pub wake_target: Option<SessionScope>,
295}
296
297impl ProcessStartOptions {
298 pub fn new() -> Self {
299 Self::default()
300 }
301
302 pub fn with_descriptor(mut self, descriptor: ProcessHandleDescriptor) -> Self {
303 self.descriptor = Some(descriptor);
304 self
305 }
306
307 pub fn with_optional_descriptor(mut self, descriptor: Option<ProcessHandleDescriptor>) -> Self {
308 self.descriptor = descriptor;
309 self
310 }
311
312 pub fn with_spawn_provenance(mut self, spawn_provenance: ProcessSpawnProvenance) -> Self {
313 self.spawn_provenance = Some(spawn_provenance);
314 self
315 }
316
317 pub fn execution_context(&self, scope: &ProcessOpScope<'_>) -> ProcessExecutionContext {
318 ProcessExecutionContext {
319 causal_invocation: scope.parent_invocation.clone(),
320 }
321 }
322}
323
324#[derive(Clone, Debug, Serialize, Deserialize)]
326pub struct ProcessStartRequest {
327 pub id: ProcessId,
328 pub input: ProcessInput,
329 #[serde(default, skip_serializing_if = "Option::is_none")]
330 pub env_spec: Option<ProcessExecutionEnvSpec>,
331 pub originator: ProcessOriginator,
332 #[serde(default, skip_serializing_if = "Option::is_none")]
333 pub wake_target: Option<SessionScope>,
334 #[serde(default, skip_serializing_if = "Option::is_none")]
335 pub grant: Option<ProcessStartGrant>,
336 #[serde(default)]
337 pub event_types: Vec<ProcessEventType>,
338}
339
340impl ProcessStartRequest {
341 pub fn new(
342 id: impl Into<ProcessId>,
343 input: ProcessInput,
344 originator: ProcessOriginator,
345 ) -> Self {
346 Self {
347 id: id.into(),
348 input,
349 env_spec: None,
350 originator,
351 wake_target: None,
352 grant: None,
353 event_types: default_process_event_types(),
354 }
355 }
356
357 pub fn external(
358 id: impl Into<ProcessId>,
359 originator: ProcessOriginator,
360 metadata: serde_json::Value,
361 ) -> Self {
362 Self::new(id, ProcessInput::External { metadata }, originator)
363 }
364
365 pub fn with_env_spec(mut self, env_spec: ProcessExecutionEnvSpec) -> Self {
366 self.env_spec = Some(env_spec);
367 self
368 }
369
370 pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
371 self.wake_target = wake_target;
372 self
373 }
374
375 pub fn with_grant(mut self, grant: Option<ProcessStartGrant>) -> Self {
376 self.grant = grant;
377 self
378 }
379
380 pub fn with_event_types(
381 mut self,
382 event_types: impl IntoIterator<Item = ProcessEventType>,
383 ) -> Self {
384 self.event_types = event_types.into_iter().collect();
385 self
386 }
387
388 pub fn with_extra_event_types(
389 mut self,
390 event_types: impl IntoIterator<Item = ProcessEventType>,
391 ) -> Self {
392 self.event_types.extend(event_types);
393 self
394 }
395
396 pub fn into_registration(
397 self,
398 host_profile_id: impl Into<String>,
399 env_ref: Option<ProcessExecutionEnvRef>,
400 ) -> ProcessRegistration {
401 ProcessRegistration::new(
402 self.id,
403 self.input,
404 ProcessProvenance::new(self.originator, host_profile_id),
405 )
406 .with_event_types(self.event_types)
407 .with_execution_env_ref(env_ref)
408 .with_wake_target(self.wake_target)
409 }
410}
411
412#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
413pub struct SessionScope {
414 pub session_id: String,
415 #[serde(default, skip_serializing_if = "Option::is_none")]
416 pub agent_frame_id: Option<crate::AgentFrameId>,
417}
418
419#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
420pub struct ProcessProvenance {
421 pub originator: ProcessOriginator,
422 pub host_profile_id: String,
423 #[serde(default, skip_serializing_if = "Option::is_none")]
424 pub caused_by: Option<crate::CausalRef>,
425}
426
427impl ProcessProvenance {
428 pub fn new(originator: ProcessOriginator, host_profile_id: impl Into<String>) -> Self {
429 Self {
430 originator,
431 host_profile_id: host_profile_id.into(),
432 caused_by: None,
433 }
434 }
435
436 pub fn host(host_profile_id: impl Into<String>) -> Self {
437 Self::new(ProcessOriginator::host(), host_profile_id)
438 }
439
440 pub fn session(scope: SessionScope, host_profile_id: impl Into<String>) -> Self {
441 Self::new(ProcessOriginator::session(scope), host_profile_id)
442 }
443
444 pub fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
445 self.caused_by = caused_by;
446 self
447 }
448}
449
450#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
451#[serde(tag = "type", rename_all = "snake_case")]
452pub enum ProcessOriginator {
453 Host,
454 Session { scope: SessionScope },
455}
456
457impl ProcessOriginator {
458 pub fn host() -> Self {
459 Self::Host
460 }
461
462 pub fn session(scope: SessionScope) -> Self {
463 Self::Session { scope }
464 }
465
466 pub fn scope_id(&self) -> String {
467 match self {
468 Self::Host => "host".to_string(),
469 Self::Session { scope } => scope.id().to_string(),
470 }
471 }
472}
473
474impl SessionScope {
475 pub fn new(session_id: impl Into<String>) -> Self {
476 Self {
477 session_id: session_id.into(),
478 agent_frame_id: None,
479 }
480 }
481
482 pub fn for_agent_frame(
483 session_id: impl Into<String>,
484 agent_frame_id: impl Into<crate::AgentFrameId>,
485 ) -> Self {
486 Self {
487 session_id: session_id.into(),
488 agent_frame_id: Some(agent_frame_id.into()),
489 }
490 }
491
492 pub fn id(&self) -> SessionScopeId {
493 match self.agent_frame_id.as_deref() {
494 Some(frame_id) if !frame_id.is_empty() => {
495 SessionScopeId::new(format!("session:{}/frame:{frame_id}", self.session_id))
496 }
497 _ => SessionScopeId::new(format!("session:{}", self.session_id)),
498 }
499 }
500
501 pub fn is_empty(&self) -> bool {
502 self.session_id.is_empty()
503 }
504}
505
506#[derive(Debug, Serialize, Deserialize)]
508pub struct ProcessRegistration {
509 pub id: ProcessId,
510 pub input: Arc<ProcessInput>,
511 #[serde(default)]
512 pub event_types: Vec<ProcessEventType>,
513 pub provenance: ProcessProvenance,
514 #[serde(default, skip_serializing_if = "Option::is_none")]
515 pub env_ref: Option<ProcessExecutionEnvRef>,
516 #[serde(default, skip_serializing_if = "Option::is_none")]
517 pub wake_target: Option<SessionScope>,
518}
519
520impl Clone for ProcessRegistration {
521 fn clone(&self) -> Self {
522 Self {
523 id: self.id.clone(),
524 input: Arc::clone(&self.input),
525 event_types: self.event_types.clone(),
526 provenance: self.provenance.clone(),
527 env_ref: self.env_ref.clone(),
528 wake_target: self.wake_target.clone(),
529 }
530 }
531}
532
533impl ProcessRegistration {
534 pub fn new(
535 id: impl Into<ProcessId>,
536 input: ProcessInput,
537 provenance: ProcessProvenance,
538 ) -> Self {
539 Self {
540 id: id.into(),
541 input: Arc::new(input),
542 event_types: default_process_event_types(),
543 provenance,
544 env_ref: None,
545 wake_target: None,
546 }
547 }
548
549 pub(crate) fn session_start_draft(id: impl Into<ProcessId>, input: ProcessInput) -> Self {
550 Self::new(id, input, ProcessProvenance::host("session-start-draft"))
551 }
552
553 pub fn with_process_provenance(mut self, provenance: ProcessProvenance) -> Self {
554 self.provenance = provenance;
555 self
556 }
557
558 pub fn with_execution_env_ref(mut self, env_ref: Option<ProcessExecutionEnvRef>) -> Self {
559 self.env_ref = env_ref;
560 self
561 }
562
563 pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
564 self.wake_target = wake_target;
565 self
566 }
567
568 pub fn with_event_types(
569 mut self,
570 event_types: impl IntoIterator<Item = ProcessEventType>,
571 ) -> Self {
572 self.event_types = event_types.into_iter().collect();
573 self
574 }
575
576 pub fn with_extra_event_types(
577 mut self,
578 event_types: impl IntoIterator<Item = ProcessEventType>,
579 ) -> Self {
580 self.event_types.extend(event_types);
581 self
582 }
583}
584
585#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
586#[serde(tag = "state", rename_all = "snake_case")]
587pub enum ProcessStatus {
588 #[default]
589 Running,
590 Completed {
591 await_output: ProcessAwaitOutput,
592 },
593 Failed {
594 await_output: ProcessAwaitOutput,
595 },
596 Cancelled {
597 await_output: ProcessAwaitOutput,
598 },
599}
600
601impl ProcessStatus {
602 pub fn from_terminal(terminal: ProcessTerminalSemantics) -> Self {
603 match terminal.state {
604 ProcessTerminalState::Completed => Self::Completed {
605 await_output: terminal.await_output,
606 },
607 ProcessTerminalState::Failed => Self::Failed {
608 await_output: terminal.await_output,
609 },
610 ProcessTerminalState::Cancelled => Self::Cancelled {
611 await_output: terminal.await_output,
612 },
613 }
614 }
615
616 pub fn is_terminal(&self) -> bool {
617 !matches!(self, Self::Running)
618 }
619
620 pub fn label(&self) -> &'static str {
621 match self {
622 Self::Running => "running",
623 Self::Completed { .. } => "completed",
624 Self::Failed { .. } => "failed",
625 Self::Cancelled { .. } => "cancelled",
626 }
627 }
628
629 pub fn terminal_state(&self) -> Option<ProcessTerminalState> {
630 match self {
631 Self::Running => None,
632 Self::Completed { .. } => Some(ProcessTerminalState::Completed),
633 Self::Failed { .. } => Some(ProcessTerminalState::Failed),
634 Self::Cancelled { .. } => Some(ProcessTerminalState::Cancelled),
635 }
636 }
637
638 pub fn await_output(&self) -> Option<&ProcessAwaitOutput> {
639 match self {
640 Self::Running => None,
641 Self::Completed { await_output }
642 | Self::Failed { await_output }
643 | Self::Cancelled { await_output } => Some(await_output),
644 }
645 }
646
647 pub fn terminal_semantics(&self) -> Option<ProcessTerminalSemantics> {
648 Some(ProcessTerminalSemantics {
649 state: self.terminal_state()?,
650 await_output: self.await_output()?.clone(),
651 })
652 }
653}
654
655#[derive(Clone, Debug, Serialize, Deserialize)]
658pub struct ProcessRecord {
659 pub id: ProcessId,
660 pub registration_hash: String,
661 pub input: Arc<ProcessInput>,
662 #[serde(default)]
663 pub event_types: Vec<ProcessEventType>,
664 pub provenance: ProcessProvenance,
665 #[serde(default, skip_serializing_if = "Option::is_none")]
666 pub env_ref: Option<ProcessExecutionEnvRef>,
667 #[serde(default, skip_serializing_if = "Option::is_none")]
668 pub wake_target: Option<SessionScope>,
669 #[serde(default)]
670 pub created_at_ms: u64,
671 #[serde(default)]
672 pub updated_at_ms: u64,
673 #[serde(default, skip_serializing_if = "Option::is_none")]
674 pub external_ref: Option<ProcessExternalRef>,
675 #[serde(default, skip_serializing_if = "Option::is_none")]
676 pub wait: Option<WaitState>,
677 #[serde(default)]
678 pub status: ProcessStatus,
679}
680
681#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
682pub struct WaitState {
683 pub kind: WaitKind,
684 pub since_ms: u64,
685}
686
687#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
688#[serde(tag = "kind", rename_all = "snake_case")]
689pub enum WaitKind {
690 Signal {
691 name: String,
692 event_type: String,
693 key: String,
694 ordinal: u64,
695 },
696}
697
698impl ProcessRecord {
699 pub fn from_registration(mut registration: ProcessRegistration) -> Self {
700 ensure_core_event_types(&mut registration);
701 validate_process_registration(®istration)
702 .expect("process registration should be valid before record construction");
703 let registration_hash = process_registration_hash(®istration)
704 .expect("process registration should hash before record construction");
705 Self::from_prepared_registration(registration, registration_hash, current_epoch_ms())
706 }
707
708 pub fn from_prepared_registration(
709 registration: ProcessRegistration,
710 registration_hash: String,
711 now_ms: u64,
712 ) -> Self {
713 Self {
714 id: registration.id,
715 registration_hash,
716 input: registration.input,
717 event_types: registration.event_types,
718 provenance: registration.provenance,
719 env_ref: registration.env_ref,
720 wake_target: registration.wake_target,
721 created_at_ms: now_ms,
722 updated_at_ms: now_ms,
723 external_ref: None,
724 wait: None,
725 status: ProcessStatus::Running,
726 }
727 }
728
729 pub fn is_terminal(&self) -> bool {
730 self.status.is_terminal()
731 }
732
733 pub fn originator_scope_id(&self) -> String {
734 self.provenance.originator.scope_id()
735 }
736
737 pub fn host_profile_id(&self) -> &str {
738 &self.provenance.host_profile_id
739 }
740}
741
742pub const PROCESS_LEASE_SCHEMA_VERSION: u32 = 1;
747
748#[derive(Clone, Debug, Serialize, Deserialize)]
762pub struct ProcessLease {
763 pub schema_version: u32,
764 pub process_id: ProcessId,
765 pub owner_id: String,
766 pub lease_token: String,
767 pub fencing_token: u64,
768 pub claimed_at_epoch_ms: u64,
769 pub expires_at_epoch_ms: u64,
770}
771
772#[derive(Clone, Debug, Serialize, Deserialize)]
773pub struct ProcessLeaseCompletion {
774 pub process_id: ProcessId,
775 pub lease_token: String,
776}
777
778impl ProcessLeaseCompletion {
779 pub fn from_lease(lease: &ProcessLease) -> Self {
780 Self {
781 process_id: lease.process_id.clone(),
782 lease_token: lease.lease_token.clone(),
783 }
784 }
785}
786
787#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
789pub struct ProcessExternalRef {
790 pub backend: String,
791 pub id: String,
792 #[serde(default, skip_serializing_if = "Option::is_none")]
793 pub metadata: Option<serde_json::Value>,
794}
795
796#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
797pub struct ProcessHandleDescriptor {
798 #[serde(default, skip_serializing_if = "Option::is_none")]
799 pub kind: Option<String>,
800 #[serde(default, skip_serializing_if = "Option::is_none")]
801 pub label: Option<String>,
802}
803
804impl ProcessHandleDescriptor {
805 pub fn new(kind: Option<impl Into<String>>, label: Option<impl Into<String>>) -> Self {
806 Self {
807 kind: kind.map(Into::into),
808 label: label.map(Into::into),
809 }
810 }
811}
812
813#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
814pub struct ProcessHandleGrant {
815 pub session_id: String,
816 pub process_id: ProcessId,
817 pub descriptor: ProcessHandleDescriptor,
818}
819
820pub type ProcessHandleGrantEntry = (ProcessHandleGrant, ProcessRecord);
821
822#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
823#[serde(rename_all = "snake_case")]
824pub enum ProcessLifecycleStatus {
825 #[default]
826 Running,
827 Completed,
828 Failed,
829 Cancelled,
830}
831
832impl ProcessLifecycleStatus {
833 pub fn label(self) -> &'static str {
834 match self {
835 Self::Running => "running",
836 Self::Completed => "completed",
837 Self::Failed => "failed",
838 Self::Cancelled => "cancelled",
839 }
840 }
841
842 pub fn is_terminal(self) -> bool {
843 !matches!(self, Self::Running)
844 }
845
846 pub fn terminal_state(self) -> Option<ProcessTerminalState> {
847 match self {
848 Self::Running => None,
849 Self::Completed => Some(ProcessTerminalState::Completed),
850 Self::Failed => Some(ProcessTerminalState::Failed),
851 Self::Cancelled => Some(ProcessTerminalState::Cancelled),
852 }
853 }
854}
855
856impl From<&ProcessStatus> for ProcessLifecycleStatus {
857 fn from(status: &ProcessStatus) -> Self {
858 match status {
859 ProcessStatus::Running => Self::Running,
860 ProcessStatus::Completed { .. } => Self::Completed,
861 ProcessStatus::Failed { .. } => Self::Failed,
862 ProcessStatus::Cancelled { .. } => Self::Cancelled,
863 }
864 }
865}
866
867impl From<ProcessStatus> for ProcessLifecycleStatus {
868 fn from(status: ProcessStatus) -> Self {
869 Self::from(&status)
870 }
871}
872
873#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
874pub struct ProcessHandleSummary {
875 #[serde(rename = "__handle__")]
876 pub handle_type: String,
877 pub id: ProcessId,
878 pub process_id: ProcessId,
879 pub descriptor: ProcessHandleDescriptor,
880 #[serde(default, skip_serializing_if = "Option::is_none")]
881 pub definition: Option<ProcessDefinitionSummary>,
882 pub status: ProcessLifecycleStatus,
883}
884
885impl ProcessHandleSummary {
886 pub fn new(
887 process_id: impl Into<ProcessId>,
888 descriptor: ProcessHandleDescriptor,
889 status: ProcessLifecycleStatus,
890 ) -> Self {
891 let process_id = process_id.into();
892 Self {
893 handle_type: "process".to_string(),
894 id: process_id.clone(),
895 process_id,
896 descriptor,
897 definition: None,
898 status,
899 }
900 }
901
902 pub fn with_definition(mut self, definition: Option<ProcessDefinitionSummary>) -> Self {
903 self.definition = definition;
904 self
905 }
906
907 pub fn from_grant_record(grant: ProcessHandleGrant, record: ProcessRecord) -> Self {
908 let definition = ProcessDefinitionSummary::from_input(record.input.as_ref());
909 Self::new(
910 record.id,
911 grant.descriptor,
912 ProcessLifecycleStatus::from(record.status),
913 )
914 .with_definition(definition)
915 }
916}
917
918impl From<ProcessHandleGrantEntry> for ProcessHandleSummary {
919 fn from((grant, record): ProcessHandleGrantEntry) -> Self {
920 Self::from_grant_record(grant, record)
921 }
922}
923
924#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
925pub struct ProcessCancelSummary {
926 pub process_id: ProcessId,
927 pub status: ProcessLifecycleStatus,
928}
929
930impl ProcessCancelSummary {
931 pub fn from_record(record: ProcessRecord) -> Self {
932 Self {
933 process_id: record.id,
934 status: ProcessLifecycleStatus::from(record.status),
935 }
936 }
937}
938
939#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
940pub struct ProcessDefinitionSummary {
941 pub name: String,
942}
943
944impl ProcessDefinitionSummary {
945 pub fn from_input(input: &ProcessInput) -> Option<Self> {
946 match input {
947 ProcessInput::LashlangProcess { process_name, .. } => Some(Self {
948 name: process_name.clone(),
949 }),
950 ProcessInput::ToolCall { .. }
951 | ProcessInput::SessionTurn { .. }
952 | ProcessInput::External { .. } => None,
953 }
954 }
955}
956
957#[derive(Clone, Debug, PartialEq, Eq)]
958pub struct ProcessDefinitionSelector {
959 module_ref: lashlang::ModuleRef,
960 required_surface_ref: lashlang::RequiredSurfaceRef,
961 process_ref: lashlang::ProcessRef,
962 process_name: String,
963}
964
965impl ProcessDefinitionSelector {
966 pub fn decode(value: &serde_json::Value) -> Result<Self, String> {
967 if value
968 .get(lashlang::LASH_PROCESS_VALUE_KEY)
969 .and_then(serde_json::Value::as_bool)
970 != Some(true)
971 {
972 return Err("definition must be a process definition value".to_string());
973 }
974 Ok(Self {
975 module_ref: decode_process_definition_field(
976 value,
977 lashlang::LASH_MODULE_REF_KEY,
978 "definition",
979 )?,
980 required_surface_ref: decode_process_definition_field(
981 value,
982 lashlang::LASH_REQUIRED_SURFACE_REF_KEY,
983 "definition",
984 )?,
985 process_ref: decode_process_definition_field(
986 value,
987 lashlang::LASH_PROCESS_REF_KEY,
988 "definition",
989 )?,
990 process_name: value
991 .get(lashlang::LASH_PROCESS_NAME_KEY)
992 .and_then(serde_json::Value::as_str)
993 .ok_or_else(|| "definition is missing its process name".to_string())?
994 .to_string(),
995 })
996 }
997
998 pub fn matches_input(&self, input: &ProcessInput) -> bool {
999 match input {
1000 ProcessInput::LashlangProcess {
1001 module_ref,
1002 process_ref,
1003 required_surface_ref,
1004 process_name,
1005 ..
1006 } => {
1007 self.module_ref == *module_ref
1008 && self.required_surface_ref == *required_surface_ref
1009 && self.process_ref == *process_ref
1010 && self.process_name == *process_name
1011 }
1012 ProcessInput::ToolCall { .. }
1013 | ProcessInput::SessionTurn { .. }
1014 | ProcessInput::External { .. } => false,
1015 }
1016 }
1017}
1018
1019fn decode_process_definition_field<T: serde::de::DeserializeOwned>(
1020 value: &serde_json::Value,
1021 field: &'static str,
1022 label: &'static str,
1023) -> Result<T, String> {
1024 serde_json::from_value(
1025 value
1026 .get(field)
1027 .cloned()
1028 .ok_or_else(|| format!("{label} is missing {field}"))?,
1029 )
1030 .map_err(|err| format!("{label} has invalid {field}: {err}"))
1031}
1032
1033#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
1034pub enum ProcessStatusFilter {
1035 #[default]
1036 Running,
1037 Completed,
1038 Failed,
1039 Cancelled,
1040 Any,
1041}
1042
1043impl ProcessStatusFilter {
1044 pub fn decode(value: Option<&str>) -> Result<Self, String> {
1045 match value.unwrap_or("running") {
1046 "running" => Ok(Self::Running),
1047 "completed" => Ok(Self::Completed),
1048 "failed" => Ok(Self::Failed),
1049 "cancelled" => Ok(Self::Cancelled),
1050 "any" => Ok(Self::Any),
1051 other => Err(format!(
1052 "processes.list status must be `running`, `completed`, `failed`, `cancelled`, or `any`, got `{other}`"
1053 )),
1054 }
1055 }
1056
1057 pub fn list_mode(self) -> ProcessListMode {
1058 match self {
1059 Self::Running => ProcessListMode::Live,
1060 Self::Completed | Self::Failed | Self::Cancelled | Self::Any => ProcessListMode::All,
1061 }
1062 }
1063
1064 pub fn matches(self, status: ProcessLifecycleStatus) -> bool {
1065 match self {
1066 Self::Running => status == ProcessLifecycleStatus::Running,
1067 Self::Completed => status == ProcessLifecycleStatus::Completed,
1068 Self::Failed => status == ProcessLifecycleStatus::Failed,
1069 Self::Cancelled => status == ProcessLifecycleStatus::Cancelled,
1070 Self::Any => true,
1071 }
1072 }
1073}
1074
1075#[derive(Clone, Debug, Default, PartialEq, Eq)]
1076pub struct ProcessListFilter {
1077 pub definition: Option<ProcessDefinitionSelector>,
1078 pub status: ProcessStatusFilter,
1079 pub waiting: Option<bool>,
1080}
1081
1082impl ProcessListFilter {
1083 pub fn decode(args: &serde_json::Value) -> Result<Self, String> {
1084 let map = args
1085 .as_object()
1086 .ok_or_else(|| "processes.list expects a record of process filters".to_string())?;
1087 for key in map.keys() {
1088 match key.as_str() {
1089 "definition" | "status" | "waiting" => {}
1090 _ => return Err(format!("processes.list unknown filter `{key}`")),
1091 }
1092 }
1093 let definition = args
1094 .get("definition")
1095 .map(ProcessDefinitionSelector::decode)
1096 .transpose()?;
1097 let status =
1098 ProcessStatusFilter::decode(args.get("status").and_then(serde_json::Value::as_str))?;
1099 let waiting = args
1100 .get("waiting")
1101 .map(|value| {
1102 value
1103 .as_bool()
1104 .ok_or_else(|| "processes.list `waiting` filter must be a boolean".to_string())
1105 })
1106 .transpose()?;
1107 Ok(Self {
1108 definition,
1109 status,
1110 waiting,
1111 })
1112 }
1113
1114 pub fn list_mode(&self) -> ProcessListMode {
1115 self.status.list_mode()
1116 }
1117
1118 pub fn matches_entry(&self, entry: &ProcessHandleGrantEntry) -> bool {
1119 let (_grant, record) = entry;
1120 self.matches_record(record)
1121 }
1122
1123 pub fn matches_record(&self, record: &ProcessRecord) -> bool {
1124 let status = ProcessLifecycleStatus::from(&record.status);
1125 self.status.matches(status)
1126 && self
1127 .definition
1128 .as_ref()
1129 .is_none_or(|definition| definition.matches_input(record.input.as_ref()))
1130 && self
1131 .waiting
1132 .is_none_or(|waiting| record.wait.is_some() == waiting)
1133 }
1134}
1135
1136#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1137#[serde(rename_all = "snake_case")]
1138pub enum ProcessListMode {
1139 #[default]
1140 Live,
1141 All,
1142}
1143
1144impl ProcessListMode {
1145 pub fn as_str(self) -> &'static str {
1146 match self {
1147 Self::Live => "live",
1148 Self::All => "all",
1149 }
1150 }
1151}
1152
1153#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1154pub struct ProcessStartGrant {
1155 pub session_scope: SessionScope,
1156 pub descriptor: ProcessHandleDescriptor,
1157}
1158
1159#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1160pub struct ProcessSessionDeleteReport {
1161 pub session_id: String,
1162 pub revoked_handle_count: usize,
1163 pub deleted_wake_count: usize,
1164 pub orphaned_process_ids: Vec<String>,
1165 pub preserved_process_ids: Vec<String>,
1166}
1167
1168#[cfg(test)]
1169mod tests {
1170 use serde_json::json;
1171
1172 use super::*;
1173
1174 fn process_ref(component: &str, pos: usize) -> lashlang::ProcessRef {
1175 lashlang::ProcessRef {
1176 component: lashlang::ContentHash::new(component),
1177 pos: pos as u32,
1178 }
1179 }
1180
1181 fn process_value(
1182 module_ref: &lashlang::ModuleRef,
1183 surface_ref: &lashlang::RequiredSurfaceRef,
1184 process_ref: &lashlang::ProcessRef,
1185 name: &str,
1186 ) -> serde_json::Value {
1187 let mut value = serde_json::Map::new();
1188 value.insert(lashlang::LASH_PROCESS_VALUE_KEY.to_string(), json!(true));
1189 value.insert(lashlang::LASH_MODULE_REF_KEY.to_string(), json!(module_ref));
1190 value.insert(
1191 lashlang::LASH_REQUIRED_SURFACE_REF_KEY.to_string(),
1192 json!(surface_ref),
1193 );
1194 value.insert(
1195 lashlang::LASH_PROCESS_REF_KEY.to_string(),
1196 json!(process_ref),
1197 );
1198 value.insert(lashlang::LASH_PROCESS_NAME_KEY.to_string(), json!(name));
1199 serde_json::Value::Object(value)
1200 }
1201
1202 fn lashlang_entry(
1203 process_id: &str,
1204 module_ref: lashlang::ModuleRef,
1205 surface_ref: lashlang::RequiredSurfaceRef,
1206 process_ref: lashlang::ProcessRef,
1207 process_name: &str,
1208 status: ProcessStatus,
1209 ) -> ProcessHandleGrantEntry {
1210 let mut record = ProcessRecord::from_registration(
1211 ProcessRegistration::new(
1212 process_id,
1213 ProcessInput::LashlangProcess {
1214 module_ref,
1215 process_ref,
1216 required_surface_ref: surface_ref,
1217 process_name: process_name.to_string(),
1218 args: serde_json::Map::new(),
1219 },
1220 ProcessProvenance::host("model-test-host"),
1221 )
1222 .with_execution_env_ref(Some(ProcessExecutionEnvRef::new(format!(
1223 "process-env:test:{process_id}"
1224 )))),
1225 );
1226 record.status = status;
1227 (
1228 ProcessHandleGrant {
1229 session_id: "session".to_string(),
1230 process_id: process_id.to_string(),
1231 descriptor: ProcessHandleDescriptor::new(Some("lashlang"), Some(process_name)),
1232 },
1233 record,
1234 )
1235 }
1236
1237 #[test]
1238 fn process_list_filter_matches_definition_and_status() {
1239 let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
1240 let surface_ref = lashlang::RequiredSurfaceRef::new(&lashlang::ContentHash::new("surface"));
1241 let target_ref = process_ref("target", 0);
1242 let other_ref = process_ref("other", 1);
1243 let filter = ProcessListFilter::decode(&json!({
1244 "definition": process_value(&module_ref, &surface_ref, &target_ref, "target"),
1245 "status": "completed"
1246 }))
1247 .expect("decode filter");
1248
1249 let matching = lashlang_entry(
1250 "matching",
1251 module_ref.clone(),
1252 surface_ref.clone(),
1253 target_ref,
1254 "target",
1255 ProcessStatus::Completed {
1256 await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1257 json!(true),
1258 )),
1259 },
1260 );
1261 let wrong_definition = lashlang_entry(
1262 "wrong-definition",
1263 module_ref,
1264 surface_ref,
1265 other_ref,
1266 "other",
1267 ProcessStatus::Completed {
1268 await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1269 json!(true),
1270 )),
1271 },
1272 );
1273
1274 assert_eq!(filter.list_mode(), ProcessListMode::All);
1275 assert!(filter.matches_entry(&matching));
1276 assert!(!filter.matches_entry(&wrong_definition));
1277 }
1278
1279 #[test]
1280 fn process_list_filter_matches_waiting_facet() {
1281 let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
1282 let surface_ref = lashlang::RequiredSurfaceRef::new(&lashlang::ContentHash::new("surface"));
1283 let process_ref = process_ref("target", 0);
1284 let mut waiting_entry = lashlang_entry(
1285 "waiting",
1286 module_ref.clone(),
1287 surface_ref.clone(),
1288 process_ref.clone(),
1289 "target",
1290 ProcessStatus::Running,
1291 );
1292 waiting_entry.1.wait = Some(WaitState {
1293 since_ms: 42,
1294 kind: WaitKind::Signal {
1295 name: "ready".to_string(),
1296 event_type: "signal.ready".to_string(),
1297 key: "process:waiting:signal.ready:1".to_string(),
1298 ordinal: 1,
1299 },
1300 });
1301 let idle_entry = lashlang_entry(
1302 "idle",
1303 module_ref,
1304 surface_ref,
1305 process_ref,
1306 "target",
1307 ProcessStatus::Running,
1308 );
1309 let waiting_filter =
1310 ProcessListFilter::decode(&json!({ "waiting": true })).expect("decode waiting filter");
1311 let idle_filter =
1312 ProcessListFilter::decode(&json!({ "waiting": false })).expect("decode idle filter");
1313
1314 assert_eq!(waiting_filter.list_mode(), ProcessListMode::Live);
1315 assert!(waiting_filter.matches_entry(&waiting_entry));
1316 assert!(!waiting_filter.matches_entry(&idle_entry));
1317 assert!(!idle_filter.matches_entry(&waiting_entry));
1318 assert!(idle_filter.matches_entry(&idle_entry));
1319 assert!(
1320 ProcessListFilter::decode(&json!({ "waiting": "yes" }))
1321 .expect_err("invalid waiting filter")
1322 .contains("must be a boolean")
1323 );
1324 }
1325}