1use std::fmt;
2use std::sync::Arc;
3
4use serde::{Deserialize, Serialize};
5
6use super::events::{
7 ProcessAwaitOutput, ProcessEventType, ProcessTerminalSemantics, ProcessTerminalState,
8 default_process_event_types,
9};
10use super::time::current_epoch_ms;
11use super::validation::{
12 ensure_core_event_types, process_registration_hash, validate_process_registration,
13};
14
15pub type ProcessId = String;
16
17#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
18#[serde(transparent)]
19pub struct SessionScopeId(String);
20
21impl SessionScopeId {
22 pub fn new(value: impl Into<String>) -> Self {
23 Self(value.into())
24 }
25
26 pub fn as_str(&self) -> &str {
27 &self.0
28 }
29}
30
31impl fmt::Display for SessionScopeId {
32 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
33 formatter.write_str(&self.0)
34 }
35}
36
37impl From<String> for SessionScopeId {
38 fn from(value: String) -> Self {
39 Self::new(value)
40 }
41}
42
43impl From<&str> for SessionScopeId {
44 fn from(value: &str) -> Self {
45 Self::new(value)
46 }
47}
48
49#[derive(Debug, Serialize, Deserialize)]
51#[serde(tag = "type", rename_all = "snake_case")]
52pub enum ProcessInput {
53 ToolCall {
54 call: crate::PreparedToolCall,
55 },
56 LashlangProcess {
57 module_ref: lashlang::ModuleRef,
58 process_ref: lashlang::ProcessRef,
59 host_requirements_ref: lashlang::HostRequirementsRef,
60 process_name: String,
61 #[serde(default)]
62 args: serde_json::Map<String, serde_json::Value>,
63 },
64 SessionTurn {
65 create_request: Box<crate::SessionCreateRequest>,
66 turn_input: Box<crate::TurnInput>,
67 output_contract: crate::ToolOutputContract,
68 },
69 External {
70 #[serde(default)]
71 metadata: serde_json::Value,
72 },
73}
74
75impl Clone for ProcessInput {
76 fn clone(&self) -> Self {
77 match self {
78 Self::ToolCall { call } => Self::ToolCall { call: call.clone() },
79 Self::LashlangProcess {
80 module_ref,
81 process_ref,
82 host_requirements_ref,
83 process_name,
84 args,
85 } => Self::LashlangProcess {
86 module_ref: module_ref.clone(),
87 process_ref: process_ref.clone(),
88 host_requirements_ref: host_requirements_ref.clone(),
89 process_name: process_name.clone(),
90 args: args.clone(),
91 },
92 Self::SessionTurn {
93 create_request,
94 turn_input,
95 output_contract,
96 } => Self::SessionTurn {
97 create_request: create_request.clone(),
98 turn_input: turn_input.clone(),
99 output_contract: output_contract.clone(),
100 },
101 Self::External { metadata } => Self::External {
102 metadata: metadata.clone(),
103 },
104 }
105 }
106}
107
108#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
109#[serde(transparent)]
110pub struct ProcessExecutionEnvRef(String);
111
112impl ProcessExecutionEnvRef {
113 pub fn new(value: impl Into<String>) -> Self {
114 Self(value.into())
115 }
116
117 pub fn as_str(&self) -> &str {
118 &self.0
119 }
120}
121
122impl fmt::Display for ProcessExecutionEnvRef {
123 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
124 formatter.write_str(&self.0)
125 }
126}
127
128#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
129#[serde(deny_unknown_fields)]
130pub struct ProcessExecutionEnvSpec {
131 #[serde(default)]
132 pub plugin_options: crate::PluginOptions,
133 #[serde(default)]
134 pub policy: crate::SessionPolicy,
135}
136
137impl ProcessExecutionEnvSpec {
138 pub fn new(plugin_options: crate::PluginOptions, policy: crate::SessionPolicy) -> Self {
139 Self {
140 plugin_options,
141 policy,
142 }
143 }
144
145 pub fn stable_ref(&self) -> Result<ProcessExecutionEnvRef, serde_json::Error> {
146 crate::stable_hash::stable_json_sha256_hex(self)
147 .map(|hash| ProcessExecutionEnvRef::new(format!("process-env:sha256:{hash}")))
148 }
149
150 pub fn to_store_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
151 serde_json::to_vec(self)
152 }
153
154 pub fn from_store_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
155 serde_json::from_slice(bytes)
156 }
157}
158
159pub async fn persist_process_execution_env(
160 artifact_store: &dyn lashlang::LashlangArtifactStore,
161 spec: &ProcessExecutionEnvSpec,
162) -> Result<ProcessExecutionEnvRef, crate::PluginError> {
163 let env_ref = spec.stable_ref().map_err(|err| {
164 crate::PluginError::Session(format!("failed to hash process execution env: {err}"))
165 })?;
166 let bytes = spec.to_store_bytes().map_err(|err| {
167 crate::PluginError::Session(format!("failed to encode process execution env: {err}"))
168 })?;
169 artifact_store
170 .put_artifact_bytes(env_ref.as_str(), "process_execution_env", &bytes)
171 .await
172 .map_err(|err| {
173 crate::PluginError::Session(format!(
174 "failed to persist process execution env `{env_ref}`: {err}"
175 ))
176 })?;
177 Ok(env_ref)
178}
179
180pub async fn load_process_execution_env(
181 artifact_store: &dyn lashlang::LashlangArtifactStore,
182 env_ref: &ProcessExecutionEnvRef,
183) -> Result<ProcessExecutionEnvSpec, crate::PluginError> {
184 let bytes = artifact_store
185 .get_artifact_bytes(env_ref.as_str())
186 .await
187 .map_err(|err| {
188 crate::PluginError::Session(format!(
189 "failed to load process execution env `{env_ref}`: {err}"
190 ))
191 })?
192 .ok_or_else(|| {
193 crate::PluginError::Session(format!("missing process execution env `{env_ref}`"))
194 })?;
195 ProcessExecutionEnvSpec::from_store_bytes(&bytes).map_err(|err| {
196 crate::PluginError::Session(format!(
197 "failed to decode process execution env `{env_ref}`: {err}"
198 ))
199 })
200}
201
202#[derive(Clone, Debug, Default, Serialize, Deserialize)]
205pub struct ProcessExecutionContext {
206 #[serde(default, skip_serializing_if = "Option::is_none")]
207 pub causal_invocation: Option<crate::RuntimeInvocation>,
208}
209
210impl ProcessExecutionContext {
211 pub fn with_causal_invocation(mut self, invocation: Option<crate::RuntimeInvocation>) -> Self {
212 self.causal_invocation = invocation;
213 self
214 }
215
216 pub fn is_empty(&self) -> bool {
217 self.causal_invocation.is_none()
218 }
219}
220
221#[derive(Clone)]
222pub struct ProcessOpScope<'scope> {
223 pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
224 pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'scope>,
225 pub(crate) agent_frame_id: Option<crate::AgentFrameId>,
226 pub(crate) target_agent_frame_id: Option<crate::AgentFrameId>,
227}
228
229impl<'scope> ProcessOpScope<'scope> {
230 pub fn new(scoped_effect_controller: crate::ScopedEffectController<'scope>) -> Self {
231 Self {
232 parent_invocation: None,
233 effect_controller: crate::runtime::RuntimeEffectControllerHandle::borrowed(
234 scoped_effect_controller,
235 ),
236 agent_frame_id: None,
237 target_agent_frame_id: None,
238 }
239 }
240
241 pub fn with_parent_invocation(
242 mut self,
243 parent_invocation: Option<crate::RuntimeInvocation>,
244 ) -> Self {
245 self.parent_invocation = parent_invocation;
246 self
247 }
248
249 pub fn with_agent_frame_id(mut self, agent_frame_id: Option<crate::AgentFrameId>) -> Self {
250 self.agent_frame_id = agent_frame_id;
251 self
252 }
253
254 pub fn with_target_agent_frame_id(
255 mut self,
256 agent_frame_id: Option<crate::AgentFrameId>,
257 ) -> Self {
258 self.target_agent_frame_id = agent_frame_id;
259 self
260 }
261
262 pub fn agent_frame_id(&self) -> Option<&str> {
263 self.agent_frame_id.as_deref()
264 }
265
266 pub fn target_agent_frame_id(&self) -> Option<&str> {
267 self.target_agent_frame_id.as_deref()
268 }
269
270 pub(crate) fn controller(&self) -> &dyn crate::RuntimeEffectController {
271 self.effect_controller.controller()
272 }
273}
274
275#[derive(Clone, Debug, Default)]
276pub struct ProcessStartOptions {
277 pub descriptor: Option<ProcessHandleDescriptor>,
278 pub spawn_provenance: Option<ProcessSpawnProvenance>,
286}
287
288#[derive(Clone, Debug, PartialEq, Eq)]
293pub struct ProcessSpawnProvenance {
294 pub originator: ProcessOriginator,
295 pub wake_target: Option<SessionScope>,
296}
297
298impl ProcessStartOptions {
299 pub fn new() -> Self {
300 Self::default()
301 }
302
303 pub fn with_descriptor(mut self, descriptor: ProcessHandleDescriptor) -> Self {
304 self.descriptor = Some(descriptor);
305 self
306 }
307
308 pub fn with_optional_descriptor(mut self, descriptor: Option<ProcessHandleDescriptor>) -> Self {
309 self.descriptor = descriptor;
310 self
311 }
312
313 pub fn with_spawn_provenance(mut self, spawn_provenance: ProcessSpawnProvenance) -> Self {
314 self.spawn_provenance = Some(spawn_provenance);
315 self
316 }
317
318 pub fn execution_context(&self, scope: &ProcessOpScope<'_>) -> ProcessExecutionContext {
319 ProcessExecutionContext {
320 causal_invocation: scope.parent_invocation.clone(),
321 }
322 }
323}
324
325#[derive(Clone, Debug, Serialize, Deserialize)]
327pub struct ProcessStartRequest {
328 pub id: ProcessId,
329 pub input: ProcessInput,
330 #[serde(default, skip_serializing_if = "Option::is_none")]
331 pub env_spec: Option<ProcessExecutionEnvSpec>,
332 pub originator: ProcessOriginator,
333 #[serde(default, skip_serializing_if = "Option::is_none")]
334 pub wake_target: Option<SessionScope>,
335 #[serde(default, skip_serializing_if = "Option::is_none")]
336 pub grant: Option<ProcessStartGrant>,
337 #[serde(default)]
338 pub event_types: Vec<ProcessEventType>,
339}
340
341impl ProcessStartRequest {
342 pub fn new(
343 id: impl Into<ProcessId>,
344 input: ProcessInput,
345 originator: ProcessOriginator,
346 ) -> Self {
347 Self {
348 id: id.into(),
349 input,
350 env_spec: None,
351 originator,
352 wake_target: None,
353 grant: None,
354 event_types: default_process_event_types(),
355 }
356 }
357
358 pub fn external(
359 id: impl Into<ProcessId>,
360 originator: ProcessOriginator,
361 metadata: serde_json::Value,
362 ) -> Self {
363 Self::new(id, ProcessInput::External { metadata }, originator)
364 }
365
366 pub fn with_env_spec(mut self, env_spec: ProcessExecutionEnvSpec) -> Self {
367 self.env_spec = Some(env_spec);
368 self
369 }
370
371 pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
372 self.wake_target = wake_target;
373 self
374 }
375
376 pub fn with_grant(mut self, grant: Option<ProcessStartGrant>) -> Self {
377 self.grant = grant;
378 self
379 }
380
381 pub fn with_event_types(
382 mut self,
383 event_types: impl IntoIterator<Item = ProcessEventType>,
384 ) -> Self {
385 self.event_types = event_types.into_iter().collect();
386 self
387 }
388
389 pub fn with_extra_event_types(
390 mut self,
391 event_types: impl IntoIterator<Item = ProcessEventType>,
392 ) -> Self {
393 self.event_types.extend(event_types);
394 self
395 }
396
397 pub fn into_registration(self, env_ref: Option<ProcessExecutionEnvRef>) -> ProcessRegistration {
398 ProcessRegistration::new(self.id, self.input, ProcessProvenance::new(self.originator))
399 .with_event_types(self.event_types)
400 .with_execution_env_ref(env_ref)
401 .with_wake_target(self.wake_target)
402 }
403}
404
405#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
406pub struct SessionScope {
407 pub session_id: String,
408 #[serde(default, skip_serializing_if = "Option::is_none")]
409 pub agent_frame_id: Option<crate::AgentFrameId>,
410}
411
412#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
413pub struct ProcessProvenance {
414 pub originator: ProcessOriginator,
415 #[serde(default, skip_serializing_if = "Option::is_none")]
416 pub caused_by: Option<crate::CausalRef>,
417}
418
419impl ProcessProvenance {
420 pub fn new(originator: ProcessOriginator) -> Self {
421 Self {
422 originator,
423 caused_by: None,
424 }
425 }
426
427 pub fn host() -> Self {
428 Self::new(ProcessOriginator::host())
429 }
430
431 pub fn session(scope: SessionScope) -> Self {
432 Self::new(ProcessOriginator::session(scope))
433 }
434
435 pub fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
436 self.caused_by = caused_by;
437 self
438 }
439}
440
441#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
442#[serde(tag = "type", rename_all = "snake_case")]
443pub enum ProcessOriginator {
444 Host,
445 Session { scope: SessionScope },
446}
447
448impl ProcessOriginator {
449 pub fn host() -> Self {
450 Self::Host
451 }
452
453 pub fn session(scope: SessionScope) -> Self {
454 Self::Session { scope }
455 }
456
457 pub fn scope_id(&self) -> String {
458 match self {
459 Self::Host => "host".to_string(),
460 Self::Session { scope } => scope.id().to_string(),
461 }
462 }
463}
464
465impl SessionScope {
466 pub fn new(session_id: impl Into<String>) -> Self {
467 Self {
468 session_id: session_id.into(),
469 agent_frame_id: None,
470 }
471 }
472
473 pub fn for_agent_frame(
474 session_id: impl Into<String>,
475 agent_frame_id: impl Into<crate::AgentFrameId>,
476 ) -> Self {
477 Self {
478 session_id: session_id.into(),
479 agent_frame_id: Some(agent_frame_id.into()),
480 }
481 }
482
483 pub fn id(&self) -> SessionScopeId {
484 match self.agent_frame_id.as_deref() {
485 Some(frame_id) if !frame_id.is_empty() => {
486 SessionScopeId::new(format!("session:{}/frame:{frame_id}", self.session_id))
487 }
488 _ => SessionScopeId::new(format!("session:{}", self.session_id)),
489 }
490 }
491
492 pub fn is_empty(&self) -> bool {
493 self.session_id.is_empty()
494 }
495}
496
497#[derive(Debug, Serialize, Deserialize)]
499pub struct ProcessRegistration {
500 pub id: ProcessId,
501 pub input: Arc<ProcessInput>,
502 #[serde(default)]
503 pub event_types: Vec<ProcessEventType>,
504 pub provenance: ProcessProvenance,
505 #[serde(default, skip_serializing_if = "Option::is_none")]
506 pub env_ref: Option<ProcessExecutionEnvRef>,
507 #[serde(default, skip_serializing_if = "Option::is_none")]
508 pub wake_target: Option<SessionScope>,
509}
510
511impl Clone for ProcessRegistration {
512 fn clone(&self) -> Self {
513 Self {
514 id: self.id.clone(),
515 input: Arc::clone(&self.input),
516 event_types: self.event_types.clone(),
517 provenance: self.provenance.clone(),
518 env_ref: self.env_ref.clone(),
519 wake_target: self.wake_target.clone(),
520 }
521 }
522}
523
524impl ProcessRegistration {
525 pub fn new(
526 id: impl Into<ProcessId>,
527 input: ProcessInput,
528 provenance: ProcessProvenance,
529 ) -> Self {
530 Self {
531 id: id.into(),
532 input: Arc::new(input),
533 event_types: default_process_event_types(),
534 provenance,
535 env_ref: None,
536 wake_target: None,
537 }
538 }
539
540 pub(crate) fn session_start_draft(id: impl Into<ProcessId>, input: ProcessInput) -> Self {
541 Self::new(id, input, ProcessProvenance::host())
542 }
543
544 pub fn with_process_provenance(mut self, provenance: ProcessProvenance) -> Self {
545 self.provenance = provenance;
546 self
547 }
548
549 pub fn with_execution_env_ref(mut self, env_ref: Option<ProcessExecutionEnvRef>) -> Self {
550 self.env_ref = env_ref;
551 self
552 }
553
554 pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
555 self.wake_target = wake_target;
556 self
557 }
558
559 pub fn with_event_types(
560 mut self,
561 event_types: impl IntoIterator<Item = ProcessEventType>,
562 ) -> Self {
563 self.event_types = event_types.into_iter().collect();
564 self
565 }
566
567 pub fn with_extra_event_types(
568 mut self,
569 event_types: impl IntoIterator<Item = ProcessEventType>,
570 ) -> Self {
571 self.event_types.extend(event_types);
572 self
573 }
574}
575
576#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
577#[serde(tag = "state", rename_all = "snake_case")]
578pub enum ProcessStatus {
579 #[default]
580 Running,
581 Completed {
582 await_output: ProcessAwaitOutput,
583 },
584 Failed {
585 await_output: ProcessAwaitOutput,
586 },
587 Cancelled {
588 await_output: ProcessAwaitOutput,
589 },
590}
591
592impl ProcessStatus {
593 pub fn from_terminal(terminal: ProcessTerminalSemantics) -> Self {
594 match terminal.state {
595 ProcessTerminalState::Completed => Self::Completed {
596 await_output: terminal.await_output,
597 },
598 ProcessTerminalState::Failed => Self::Failed {
599 await_output: terminal.await_output,
600 },
601 ProcessTerminalState::Cancelled => Self::Cancelled {
602 await_output: terminal.await_output,
603 },
604 }
605 }
606
607 pub fn is_terminal(&self) -> bool {
608 !matches!(self, Self::Running)
609 }
610
611 pub fn label(&self) -> &'static str {
612 match self {
613 Self::Running => "running",
614 Self::Completed { .. } => "completed",
615 Self::Failed { .. } => "failed",
616 Self::Cancelled { .. } => "cancelled",
617 }
618 }
619
620 pub fn terminal_state(&self) -> Option<ProcessTerminalState> {
621 match self {
622 Self::Running => None,
623 Self::Completed { .. } => Some(ProcessTerminalState::Completed),
624 Self::Failed { .. } => Some(ProcessTerminalState::Failed),
625 Self::Cancelled { .. } => Some(ProcessTerminalState::Cancelled),
626 }
627 }
628
629 pub fn await_output(&self) -> Option<&ProcessAwaitOutput> {
630 match self {
631 Self::Running => None,
632 Self::Completed { await_output }
633 | Self::Failed { await_output }
634 | Self::Cancelled { await_output } => Some(await_output),
635 }
636 }
637
638 pub fn terminal_semantics(&self) -> Option<ProcessTerminalSemantics> {
639 Some(ProcessTerminalSemantics {
640 state: self.terminal_state()?,
641 await_output: self.await_output()?.clone(),
642 })
643 }
644}
645
646#[derive(Clone, Debug, Serialize, Deserialize)]
649pub struct ProcessRecord {
650 pub id: ProcessId,
651 pub registration_hash: String,
652 pub input: Arc<ProcessInput>,
653 #[serde(default)]
654 pub event_types: Vec<ProcessEventType>,
655 pub provenance: ProcessProvenance,
656 #[serde(default, skip_serializing_if = "Option::is_none")]
657 pub env_ref: Option<ProcessExecutionEnvRef>,
658 #[serde(default, skip_serializing_if = "Option::is_none")]
659 pub wake_target: Option<SessionScope>,
660 #[serde(default)]
661 pub created_at_ms: u64,
662 #[serde(default)]
663 pub updated_at_ms: u64,
664 #[serde(default, skip_serializing_if = "Option::is_none")]
665 pub external_ref: Option<ProcessExternalRef>,
666 #[serde(default, skip_serializing_if = "Option::is_none")]
667 pub wait: Option<WaitState>,
668 #[serde(default)]
669 pub status: ProcessStatus,
670}
671
672#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
673pub struct WaitState {
674 pub kind: WaitKind,
675 pub since_ms: u64,
676}
677
678#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
679#[serde(tag = "kind", rename_all = "snake_case")]
680pub enum WaitKind {
681 Signal {
682 name: String,
683 event_type: String,
684 key: String,
685 ordinal: u64,
686 },
687}
688
689impl ProcessRecord {
690 pub fn from_registration(mut registration: ProcessRegistration) -> Self {
691 ensure_core_event_types(&mut registration);
692 validate_process_registration(®istration)
693 .expect("process registration should be valid before record construction");
694 let registration_hash = process_registration_hash(®istration)
695 .expect("process registration should hash before record construction");
696 Self::from_prepared_registration(registration, registration_hash, current_epoch_ms())
697 }
698
699 pub fn from_prepared_registration(
700 registration: ProcessRegistration,
701 registration_hash: String,
702 now_ms: u64,
703 ) -> Self {
704 Self {
705 id: registration.id,
706 registration_hash,
707 input: registration.input,
708 event_types: registration.event_types,
709 provenance: registration.provenance,
710 env_ref: registration.env_ref,
711 wake_target: registration.wake_target,
712 created_at_ms: now_ms,
713 updated_at_ms: now_ms,
714 external_ref: None,
715 wait: None,
716 status: ProcessStatus::Running,
717 }
718 }
719
720 pub fn is_terminal(&self) -> bool {
721 self.status.is_terminal()
722 }
723
724 pub fn originator_scope_id(&self) -> String {
725 self.provenance.originator.scope_id()
726 }
727}
728
729pub const PROCESS_LEASE_SCHEMA_VERSION: u32 = 1;
734
735#[derive(Clone, Debug, Serialize, Deserialize)]
749pub struct ProcessLease {
750 pub schema_version: u32,
751 pub process_id: ProcessId,
752 pub owner_id: String,
753 pub lease_token: String,
754 pub fencing_token: u64,
755 pub claimed_at_epoch_ms: u64,
756 pub expires_at_epoch_ms: u64,
757}
758
759#[derive(Clone, Debug, Serialize, Deserialize)]
760pub struct ProcessLeaseCompletion {
761 pub process_id: ProcessId,
762 pub lease_token: String,
763}
764
765impl ProcessLeaseCompletion {
766 pub fn from_lease(lease: &ProcessLease) -> Self {
767 Self {
768 process_id: lease.process_id.clone(),
769 lease_token: lease.lease_token.clone(),
770 }
771 }
772}
773
774#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
776pub struct ProcessExternalRef {
777 pub backend: String,
778 pub id: String,
779 #[serde(default, skip_serializing_if = "Option::is_none")]
780 pub metadata: Option<serde_json::Value>,
781}
782
783#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
784pub struct ProcessHandleDescriptor {
785 #[serde(default, skip_serializing_if = "Option::is_none")]
786 pub kind: Option<String>,
787 #[serde(default, skip_serializing_if = "Option::is_none")]
788 pub label: Option<String>,
789}
790
791impl ProcessHandleDescriptor {
792 pub fn new(kind: Option<impl Into<String>>, label: Option<impl Into<String>>) -> Self {
793 Self {
794 kind: kind.map(Into::into),
795 label: label.map(Into::into),
796 }
797 }
798}
799
800#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
801pub struct ProcessHandleGrant {
802 pub session_id: String,
803 pub process_id: ProcessId,
804 pub descriptor: ProcessHandleDescriptor,
805}
806
807pub type ProcessHandleGrantEntry = (ProcessHandleGrant, ProcessRecord);
808
809#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
810#[serde(rename_all = "snake_case")]
811pub enum ProcessLifecycleStatus {
812 #[default]
813 Running,
814 Completed,
815 Failed,
816 Cancelled,
817}
818
819impl ProcessLifecycleStatus {
820 pub fn label(self) -> &'static str {
821 match self {
822 Self::Running => "running",
823 Self::Completed => "completed",
824 Self::Failed => "failed",
825 Self::Cancelled => "cancelled",
826 }
827 }
828
829 pub fn is_terminal(self) -> bool {
830 !matches!(self, Self::Running)
831 }
832
833 pub fn terminal_state(self) -> Option<ProcessTerminalState> {
834 match self {
835 Self::Running => None,
836 Self::Completed => Some(ProcessTerminalState::Completed),
837 Self::Failed => Some(ProcessTerminalState::Failed),
838 Self::Cancelled => Some(ProcessTerminalState::Cancelled),
839 }
840 }
841}
842
843impl From<&ProcessStatus> for ProcessLifecycleStatus {
844 fn from(status: &ProcessStatus) -> Self {
845 match status {
846 ProcessStatus::Running => Self::Running,
847 ProcessStatus::Completed { .. } => Self::Completed,
848 ProcessStatus::Failed { .. } => Self::Failed,
849 ProcessStatus::Cancelled { .. } => Self::Cancelled,
850 }
851 }
852}
853
854impl From<ProcessStatus> for ProcessLifecycleStatus {
855 fn from(status: ProcessStatus) -> Self {
856 Self::from(&status)
857 }
858}
859
860#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
861pub struct ProcessHandleSummary {
862 #[serde(rename = "__handle__")]
863 pub handle_type: String,
864 pub id: ProcessId,
865 pub process_id: ProcessId,
866 pub descriptor: ProcessHandleDescriptor,
867 #[serde(default, skip_serializing_if = "Option::is_none")]
868 pub definition: Option<ProcessDefinitionSummary>,
869 pub status: ProcessLifecycleStatus,
870}
871
872impl ProcessHandleSummary {
873 pub fn new(
874 process_id: impl Into<ProcessId>,
875 descriptor: ProcessHandleDescriptor,
876 status: ProcessLifecycleStatus,
877 ) -> Self {
878 let process_id = process_id.into();
879 Self {
880 handle_type: "process".to_string(),
881 id: process_id.clone(),
882 process_id,
883 descriptor,
884 definition: None,
885 status,
886 }
887 }
888
889 pub fn with_definition(mut self, definition: Option<ProcessDefinitionSummary>) -> Self {
890 self.definition = definition;
891 self
892 }
893
894 pub fn from_grant_record(grant: ProcessHandleGrant, record: ProcessRecord) -> Self {
895 let definition = ProcessDefinitionSummary::from_input(record.input.as_ref());
896 Self::new(
897 record.id,
898 grant.descriptor,
899 ProcessLifecycleStatus::from(record.status),
900 )
901 .with_definition(definition)
902 }
903}
904
905impl From<ProcessHandleGrantEntry> for ProcessHandleSummary {
906 fn from((grant, record): ProcessHandleGrantEntry) -> Self {
907 Self::from_grant_record(grant, record)
908 }
909}
910
911#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
912pub struct ProcessCancelSummary {
913 pub process_id: ProcessId,
914 pub status: ProcessLifecycleStatus,
915}
916
917impl ProcessCancelSummary {
918 pub fn from_record(record: ProcessRecord) -> Self {
919 Self {
920 process_id: record.id,
921 status: ProcessLifecycleStatus::from(record.status),
922 }
923 }
924}
925
926#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
927pub struct ProcessDefinitionSummary {
928 pub name: String,
929}
930
931impl ProcessDefinitionSummary {
932 pub fn from_input(input: &ProcessInput) -> Option<Self> {
933 match input {
934 ProcessInput::LashlangProcess { process_name, .. } => Some(Self {
935 name: process_name.clone(),
936 }),
937 ProcessInput::ToolCall { .. }
938 | ProcessInput::SessionTurn { .. }
939 | ProcessInput::External { .. } => None,
940 }
941 }
942}
943
944#[derive(Clone, Debug, PartialEq, Eq)]
945pub struct ProcessDefinitionSelector {
946 module_ref: lashlang::ModuleRef,
947 host_requirements_ref: lashlang::HostRequirementsRef,
948 process_ref: lashlang::ProcessRef,
949 process_name: String,
950}
951
952impl ProcessDefinitionSelector {
953 pub fn new(
954 module_ref: lashlang::ModuleRef,
955 host_requirements_ref: lashlang::HostRequirementsRef,
956 process_ref: lashlang::ProcessRef,
957 process_name: impl Into<String>,
958 ) -> Self {
959 Self {
960 module_ref,
961 host_requirements_ref,
962 process_ref,
963 process_name: process_name.into(),
964 }
965 }
966
967 pub fn module_ref(&self) -> &lashlang::ModuleRef {
968 &self.module_ref
969 }
970
971 pub fn host_requirements_ref(&self) -> &lashlang::HostRequirementsRef {
972 &self.host_requirements_ref
973 }
974
975 pub fn process_ref(&self) -> &lashlang::ProcessRef {
976 &self.process_ref
977 }
978
979 pub fn process_name(&self) -> &str {
980 &self.process_name
981 }
982
983 pub fn decode(value: &serde_json::Value) -> Result<Self, String> {
984 if value
985 .get(lashlang::LASH_PROCESS_VALUE_KEY)
986 .and_then(serde_json::Value::as_bool)
987 != Some(true)
988 {
989 return Err("definition must be a process definition value".to_string());
990 }
991 Ok(Self {
992 module_ref: decode_process_definition_field(
993 value,
994 lashlang::LASH_MODULE_REF_KEY,
995 "definition",
996 )?,
997 host_requirements_ref: decode_process_definition_field(
998 value,
999 lashlang::LASH_HOST_REQUIREMENTS_REF_KEY,
1000 "definition",
1001 )?,
1002 process_ref: decode_process_definition_field(
1003 value,
1004 lashlang::LASH_PROCESS_REF_KEY,
1005 "definition",
1006 )?,
1007 process_name: value
1008 .get(lashlang::LASH_PROCESS_NAME_KEY)
1009 .and_then(serde_json::Value::as_str)
1010 .ok_or_else(|| "definition is missing its process name".to_string())?
1011 .to_string(),
1012 })
1013 }
1014
1015 pub fn matches_input(&self, input: &ProcessInput) -> bool {
1016 match input {
1017 ProcessInput::LashlangProcess {
1018 module_ref,
1019 process_ref,
1020 host_requirements_ref,
1021 process_name,
1022 ..
1023 } => {
1024 self.module_ref == *module_ref
1025 && self.host_requirements_ref == *host_requirements_ref
1026 && self.process_ref == *process_ref
1027 && self.process_name == *process_name
1028 }
1029 ProcessInput::ToolCall { .. }
1030 | ProcessInput::SessionTurn { .. }
1031 | ProcessInput::External { .. } => false,
1032 }
1033 }
1034}
1035
1036fn decode_process_definition_field<T: serde::de::DeserializeOwned>(
1037 value: &serde_json::Value,
1038 field: &'static str,
1039 label: &'static str,
1040) -> Result<T, String> {
1041 serde_json::from_value(
1042 value
1043 .get(field)
1044 .cloned()
1045 .ok_or_else(|| format!("{label} is missing {field}"))?,
1046 )
1047 .map_err(|err| format!("{label} has invalid {field}: {err}"))
1048}
1049
1050#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
1051pub enum ProcessStatusFilter {
1052 #[default]
1053 Running,
1054 Completed,
1055 Failed,
1056 Cancelled,
1057 Any,
1058}
1059
1060impl ProcessStatusFilter {
1061 pub fn decode(value: Option<&str>) -> Result<Self, String> {
1062 match value.unwrap_or("running") {
1063 "running" => Ok(Self::Running),
1064 "completed" => Ok(Self::Completed),
1065 "failed" => Ok(Self::Failed),
1066 "cancelled" => Ok(Self::Cancelled),
1067 "any" => Ok(Self::Any),
1068 other => Err(format!(
1069 "processes.list status must be `running`, `completed`, `failed`, `cancelled`, or `any`, got `{other}`"
1070 )),
1071 }
1072 }
1073
1074 pub fn list_mode(self) -> ProcessListMode {
1075 match self {
1076 Self::Running => ProcessListMode::Live,
1077 Self::Completed | Self::Failed | Self::Cancelled | Self::Any => ProcessListMode::All,
1078 }
1079 }
1080
1081 pub fn matches(self, status: ProcessLifecycleStatus) -> bool {
1082 match self {
1083 Self::Running => status == ProcessLifecycleStatus::Running,
1084 Self::Completed => status == ProcessLifecycleStatus::Completed,
1085 Self::Failed => status == ProcessLifecycleStatus::Failed,
1086 Self::Cancelled => status == ProcessLifecycleStatus::Cancelled,
1087 Self::Any => true,
1088 }
1089 }
1090}
1091
1092#[derive(Clone, Debug, Default, PartialEq, Eq)]
1093pub struct ProcessListFilter {
1094 pub definition: Option<ProcessDefinitionSelector>,
1095 pub status: ProcessStatusFilter,
1096 pub waiting: Option<bool>,
1097}
1098
1099impl ProcessListFilter {
1100 pub fn decode(args: &serde_json::Value) -> Result<Self, String> {
1101 let map = args
1102 .as_object()
1103 .ok_or_else(|| "processes.list expects a record of process filters".to_string())?;
1104 for key in map.keys() {
1105 match key.as_str() {
1106 "definition" | "status" | "waiting" => {}
1107 _ => return Err(format!("processes.list unknown filter `{key}`")),
1108 }
1109 }
1110 let definition = args
1111 .get("definition")
1112 .map(ProcessDefinitionSelector::decode)
1113 .transpose()?;
1114 let status =
1115 ProcessStatusFilter::decode(args.get("status").and_then(serde_json::Value::as_str))?;
1116 let waiting = args
1117 .get("waiting")
1118 .map(|value| {
1119 value
1120 .as_bool()
1121 .ok_or_else(|| "processes.list `waiting` filter must be a boolean".to_string())
1122 })
1123 .transpose()?;
1124 Ok(Self {
1125 definition,
1126 status,
1127 waiting,
1128 })
1129 }
1130
1131 pub fn list_mode(&self) -> ProcessListMode {
1132 self.status.list_mode()
1133 }
1134
1135 pub fn matches_entry(&self, entry: &ProcessHandleGrantEntry) -> bool {
1136 let (_grant, record) = entry;
1137 self.matches_record(record)
1138 }
1139
1140 pub fn matches_record(&self, record: &ProcessRecord) -> bool {
1141 let status = ProcessLifecycleStatus::from(&record.status);
1142 self.status.matches(status)
1143 && self
1144 .definition
1145 .as_ref()
1146 .is_none_or(|definition| definition.matches_input(record.input.as_ref()))
1147 && self
1148 .waiting
1149 .is_none_or(|waiting| record.wait.is_some() == waiting)
1150 }
1151}
1152
1153#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1154#[serde(rename_all = "snake_case")]
1155pub enum ProcessListMode {
1156 #[default]
1157 Live,
1158 All,
1159}
1160
1161impl ProcessListMode {
1162 pub fn as_str(self) -> &'static str {
1163 match self {
1164 Self::Live => "live",
1165 Self::All => "all",
1166 }
1167 }
1168}
1169
1170#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1171pub struct ProcessStartGrant {
1172 pub session_scope: SessionScope,
1173 pub descriptor: ProcessHandleDescriptor,
1174}
1175
1176#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1177pub struct ProcessSessionDeleteReport {
1178 pub session_id: String,
1179 pub revoked_handle_count: usize,
1180 pub deleted_wake_count: usize,
1181 pub orphaned_process_ids: Vec<String>,
1182 pub preserved_process_ids: Vec<String>,
1183}
1184
1185#[cfg(test)]
1186mod tests {
1187 use serde_json::json;
1188
1189 use super::*;
1190
1191 fn process_ref(component: &str, pos: usize) -> lashlang::ProcessRef {
1192 lashlang::ProcessRef {
1193 component: lashlang::ContentHash::new(component),
1194 pos: pos as u32,
1195 }
1196 }
1197
1198 fn process_value(
1199 module_ref: &lashlang::ModuleRef,
1200 host_requirements_ref: &lashlang::HostRequirementsRef,
1201 process_ref: &lashlang::ProcessRef,
1202 name: &str,
1203 ) -> serde_json::Value {
1204 let mut value = serde_json::Map::new();
1205 value.insert(lashlang::LASH_PROCESS_VALUE_KEY.to_string(), json!(true));
1206 value.insert(lashlang::LASH_MODULE_REF_KEY.to_string(), json!(module_ref));
1207 value.insert(
1208 lashlang::LASH_HOST_REQUIREMENTS_REF_KEY.to_string(),
1209 json!(host_requirements_ref),
1210 );
1211 value.insert(
1212 lashlang::LASH_PROCESS_REF_KEY.to_string(),
1213 json!(process_ref),
1214 );
1215 value.insert(lashlang::LASH_PROCESS_NAME_KEY.to_string(), json!(name));
1216 serde_json::Value::Object(value)
1217 }
1218
1219 fn lashlang_entry(
1220 process_id: &str,
1221 module_ref: lashlang::ModuleRef,
1222 host_requirements_ref: lashlang::HostRequirementsRef,
1223 process_ref: lashlang::ProcessRef,
1224 process_name: &str,
1225 status: ProcessStatus,
1226 ) -> ProcessHandleGrantEntry {
1227 let mut record = ProcessRecord::from_registration(
1228 ProcessRegistration::new(
1229 process_id,
1230 ProcessInput::LashlangProcess {
1231 module_ref,
1232 process_ref,
1233 host_requirements_ref: host_requirements_ref,
1234 process_name: process_name.to_string(),
1235 args: serde_json::Map::new(),
1236 },
1237 ProcessProvenance::host(),
1238 )
1239 .with_execution_env_ref(Some(ProcessExecutionEnvRef::new(format!(
1240 "process-env:test:{process_id}"
1241 )))),
1242 );
1243 record.status = status;
1244 (
1245 ProcessHandleGrant {
1246 session_id: "session".to_string(),
1247 process_id: process_id.to_string(),
1248 descriptor: ProcessHandleDescriptor::new(Some("lashlang"), Some(process_name)),
1249 },
1250 record,
1251 )
1252 }
1253
1254 #[test]
1255 fn process_list_filter_matches_definition_and_status() {
1256 let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
1257 let host_requirements_ref =
1258 lashlang::HostRequirementsRef::new(&lashlang::ContentHash::new("surface"));
1259 let target_ref = process_ref("target", 0);
1260 let other_ref = process_ref("other", 1);
1261 let filter = ProcessListFilter::decode(&json!({
1262 "definition": process_value(&module_ref, &host_requirements_ref, &target_ref, "target"),
1263 "status": "completed"
1264 }))
1265 .expect("decode filter");
1266
1267 let matching = lashlang_entry(
1268 "matching",
1269 module_ref.clone(),
1270 host_requirements_ref.clone(),
1271 target_ref,
1272 "target",
1273 ProcessStatus::Completed {
1274 await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1275 json!(true),
1276 )),
1277 },
1278 );
1279 let wrong_definition = lashlang_entry(
1280 "wrong-definition",
1281 module_ref,
1282 host_requirements_ref,
1283 other_ref,
1284 "other",
1285 ProcessStatus::Completed {
1286 await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1287 json!(true),
1288 )),
1289 },
1290 );
1291
1292 assert_eq!(filter.list_mode(), ProcessListMode::All);
1293 assert!(filter.matches_entry(&matching));
1294 assert!(!filter.matches_entry(&wrong_definition));
1295 }
1296
1297 #[test]
1298 fn process_list_filter_matches_waiting_facet() {
1299 let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
1300 let host_requirements_ref =
1301 lashlang::HostRequirementsRef::new(&lashlang::ContentHash::new("surface"));
1302 let process_ref = process_ref("target", 0);
1303 let mut waiting_entry = lashlang_entry(
1304 "waiting",
1305 module_ref.clone(),
1306 host_requirements_ref.clone(),
1307 process_ref.clone(),
1308 "target",
1309 ProcessStatus::Running,
1310 );
1311 waiting_entry.1.wait = Some(WaitState {
1312 since_ms: 42,
1313 kind: WaitKind::Signal {
1314 name: "ready".to_string(),
1315 event_type: "signal.ready".to_string(),
1316 key: "process:waiting:signal.ready:1".to_string(),
1317 ordinal: 1,
1318 },
1319 });
1320 let idle_entry = lashlang_entry(
1321 "idle",
1322 module_ref,
1323 host_requirements_ref,
1324 process_ref,
1325 "target",
1326 ProcessStatus::Running,
1327 );
1328 let waiting_filter =
1329 ProcessListFilter::decode(&json!({ "waiting": true })).expect("decode waiting filter");
1330 let idle_filter =
1331 ProcessListFilter::decode(&json!({ "waiting": false })).expect("decode idle filter");
1332
1333 assert_eq!(waiting_filter.list_mode(), ProcessListMode::Live);
1334 assert!(waiting_filter.matches_entry(&waiting_entry));
1335 assert!(!waiting_filter.matches_entry(&idle_entry));
1336 assert!(!idle_filter.matches_entry(&waiting_entry));
1337 assert!(idle_filter.matches_entry(&idle_entry));
1338 assert!(
1339 ProcessListFilter::decode(&json!({ "waiting": "yes" }))
1340 .expect_err("invalid waiting filter")
1341 .contains("must be a boolean")
1342 );
1343 }
1344}