1use std::fmt;
2use std::sync::Arc;
3
4use serde::{Deserialize, Serialize};
5
6use super::events::{
7 ProcessAwaitOutput, ProcessEventType, ProcessTerminalSemantics, ProcessTerminalState,
8 default_process_event_types,
9};
10use super::time::current_epoch_ms;
11use super::validation::{
12 ensure_core_event_types, process_registration_hash, validate_process_registration,
13};
14
15pub type ProcessId = String;
16
17#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
18#[serde(transparent)]
19pub struct SessionScopeId(String);
20
21impl SessionScopeId {
22 pub fn new(value: impl Into<String>) -> Self {
23 Self(value.into())
24 }
25
26 pub fn as_str(&self) -> &str {
27 &self.0
28 }
29}
30
31impl fmt::Display for SessionScopeId {
32 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
33 formatter.write_str(&self.0)
34 }
35}
36
37impl From<String> for SessionScopeId {
38 fn from(value: String) -> Self {
39 Self::new(value)
40 }
41}
42
43impl From<&str> for SessionScopeId {
44 fn from(value: &str) -> Self {
45 Self::new(value)
46 }
47}
48
49#[derive(Debug, Serialize, Deserialize)]
51#[serde(tag = "type", rename_all = "snake_case")]
52pub enum ProcessInput {
53 ToolCall {
54 call: crate::PreparedToolCall,
55 },
56 LashlangProcess {
57 module_ref: lashlang::ModuleRef,
58 process_ref: lashlang::ProcessRef,
59 host_requirements_ref: lashlang::HostRequirementsRef,
60 process_name: String,
61 #[serde(default)]
62 args: serde_json::Map<String, serde_json::Value>,
63 },
64 SessionTurn {
65 create_request: Box<crate::SessionCreateRequest>,
66 turn_input: Box<crate::TurnInput>,
67 output_contract: crate::ToolOutputContract,
68 },
69 External {
70 #[serde(default)]
71 metadata: serde_json::Value,
72 },
73}
74
75impl Clone for ProcessInput {
76 fn clone(&self) -> Self {
77 match self {
78 Self::ToolCall { call } => Self::ToolCall { call: call.clone() },
79 Self::LashlangProcess {
80 module_ref,
81 process_ref,
82 host_requirements_ref,
83 process_name,
84 args,
85 } => Self::LashlangProcess {
86 module_ref: module_ref.clone(),
87 process_ref: process_ref.clone(),
88 host_requirements_ref: host_requirements_ref.clone(),
89 process_name: process_name.clone(),
90 args: args.clone(),
91 },
92 Self::SessionTurn {
93 create_request,
94 turn_input,
95 output_contract,
96 } => Self::SessionTurn {
97 create_request: create_request.clone(),
98 turn_input: turn_input.clone(),
99 output_contract: output_contract.clone(),
100 },
101 Self::External { metadata } => Self::External {
102 metadata: metadata.clone(),
103 },
104 }
105 }
106}
107
108#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
109#[serde(transparent)]
110pub struct ProcessExecutionEnvRef(String);
111
112impl ProcessExecutionEnvRef {
113 pub fn new(value: impl Into<String>) -> Self {
114 Self(value.into())
115 }
116
117 pub fn as_str(&self) -> &str {
118 &self.0
119 }
120}
121
122impl fmt::Display for ProcessExecutionEnvRef {
123 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
124 formatter.write_str(&self.0)
125 }
126}
127
128#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
129#[serde(deny_unknown_fields)]
130pub struct ProcessExecutionEnvSpec {
131 #[serde(default)]
132 pub plugin_options: crate::PluginOptions,
133 #[serde(default)]
134 pub policy: crate::SessionPolicy,
135}
136
137impl ProcessExecutionEnvSpec {
138 pub fn new(plugin_options: crate::PluginOptions, policy: crate::SessionPolicy) -> Self {
139 Self {
140 plugin_options,
141 policy,
142 }
143 }
144
145 pub fn stable_ref(&self) -> Result<ProcessExecutionEnvRef, serde_json::Error> {
146 crate::stable_hash::stable_json_sha256_hex(self)
147 .map(|hash| ProcessExecutionEnvRef::new(format!("process-env:sha256:{hash}")))
148 }
149
150 pub fn to_store_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
151 serde_json::to_vec(self)
152 }
153
154 pub fn from_store_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
155 serde_json::from_slice(bytes)
156 }
157}
158
159pub async fn persist_process_execution_env(
160 artifact_store: &dyn lashlang::LashlangArtifactStore,
161 spec: &ProcessExecutionEnvSpec,
162) -> Result<ProcessExecutionEnvRef, crate::PluginError> {
163 let env_ref = spec.stable_ref().map_err(|err| {
164 crate::PluginError::Session(format!("failed to hash process execution env: {err}"))
165 })?;
166 let bytes = spec.to_store_bytes().map_err(|err| {
167 crate::PluginError::Session(format!("failed to encode process execution env: {err}"))
168 })?;
169 artifact_store
170 .put_artifact_bytes(env_ref.as_str(), "process_execution_env", &bytes)
171 .await
172 .map_err(|err| {
173 crate::PluginError::Session(format!(
174 "failed to persist process execution env `{env_ref}`: {err}"
175 ))
176 })?;
177 Ok(env_ref)
178}
179
180pub async fn load_process_execution_env(
181 artifact_store: &dyn lashlang::LashlangArtifactStore,
182 env_ref: &ProcessExecutionEnvRef,
183) -> Result<ProcessExecutionEnvSpec, crate::PluginError> {
184 let bytes = artifact_store
185 .get_artifact_bytes(env_ref.as_str())
186 .await
187 .map_err(|err| {
188 crate::PluginError::Session(format!(
189 "failed to load process execution env `{env_ref}`: {err}"
190 ))
191 })?
192 .ok_or_else(|| {
193 crate::PluginError::Session(format!("missing process execution env `{env_ref}`"))
194 })?;
195 ProcessExecutionEnvSpec::from_store_bytes(&bytes).map_err(|err| {
196 crate::PluginError::Session(format!(
197 "failed to decode process execution env `{env_ref}`: {err}"
198 ))
199 })
200}
201
202#[derive(Clone, Debug, Default, Serialize, Deserialize)]
205pub struct ProcessExecutionContext {
206 #[serde(default, skip_serializing_if = "Option::is_none")]
207 pub causal_invocation: Option<crate::RuntimeInvocation>,
208}
209
210impl ProcessExecutionContext {
211 pub fn with_causal_invocation(mut self, invocation: Option<crate::RuntimeInvocation>) -> Self {
212 self.causal_invocation = invocation;
213 self
214 }
215
216 pub fn is_empty(&self) -> bool {
217 self.causal_invocation.is_none()
218 }
219}
220
221#[derive(Clone)]
222pub struct ProcessOpScope<'scope> {
223 pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
224 pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'scope>,
225 pub(crate) agent_frame_id: Option<crate::AgentFrameId>,
226 pub(crate) target_agent_frame_id: Option<crate::AgentFrameId>,
227}
228
229impl<'scope> ProcessOpScope<'scope> {
230 pub fn new(scoped_effect_controller: crate::ScopedEffectController<'scope>) -> Self {
231 Self {
232 parent_invocation: None,
233 effect_controller: crate::runtime::RuntimeEffectControllerHandle::borrowed(
234 scoped_effect_controller,
235 ),
236 agent_frame_id: None,
237 target_agent_frame_id: None,
238 }
239 }
240
241 pub fn with_parent_invocation(
242 mut self,
243 parent_invocation: Option<crate::RuntimeInvocation>,
244 ) -> Self {
245 self.parent_invocation = parent_invocation;
246 self
247 }
248
249 pub fn with_agent_frame_id(mut self, agent_frame_id: Option<crate::AgentFrameId>) -> Self {
250 self.agent_frame_id = agent_frame_id;
251 self
252 }
253
254 pub fn with_target_agent_frame_id(
255 mut self,
256 agent_frame_id: Option<crate::AgentFrameId>,
257 ) -> Self {
258 self.target_agent_frame_id = agent_frame_id;
259 self
260 }
261
262 pub fn agent_frame_id(&self) -> Option<&str> {
263 self.agent_frame_id.as_deref()
264 }
265
266 pub fn target_agent_frame_id(&self) -> Option<&str> {
267 self.target_agent_frame_id.as_deref()
268 }
269
270 pub(crate) fn controller(&self) -> &dyn crate::RuntimeEffectController {
271 self.effect_controller.controller()
272 }
273}
274
275#[derive(Clone, Debug, Default)]
276pub struct ProcessStartOptions {
277 pub descriptor: Option<ProcessHandleDescriptor>,
278 pub spawn_provenance: Option<ProcessSpawnProvenance>,
286}
287
288#[derive(Clone, Debug, PartialEq, Eq)]
293pub struct ProcessSpawnProvenance {
294 pub originator: ProcessOriginator,
295 pub wake_target: Option<SessionScope>,
296}
297
298impl ProcessStartOptions {
299 pub fn new() -> Self {
300 Self::default()
301 }
302
303 pub fn with_descriptor(mut self, descriptor: ProcessHandleDescriptor) -> Self {
304 self.descriptor = Some(descriptor);
305 self
306 }
307
308 pub fn with_optional_descriptor(mut self, descriptor: Option<ProcessHandleDescriptor>) -> Self {
309 self.descriptor = descriptor;
310 self
311 }
312
313 pub fn with_spawn_provenance(mut self, spawn_provenance: ProcessSpawnProvenance) -> Self {
314 self.spawn_provenance = Some(spawn_provenance);
315 self
316 }
317
318 pub fn execution_context(&self, scope: &ProcessOpScope<'_>) -> ProcessExecutionContext {
319 ProcessExecutionContext {
320 causal_invocation: scope.parent_invocation.clone(),
321 }
322 }
323}
324
325#[derive(Clone, Debug, Serialize, Deserialize)]
327pub struct ProcessStartRequest {
328 pub id: ProcessId,
329 pub input: ProcessInput,
330 #[serde(default, skip_serializing_if = "Option::is_none")]
331 pub env_spec: Option<ProcessExecutionEnvSpec>,
332 pub originator: ProcessOriginator,
333 #[serde(default, skip_serializing_if = "Option::is_none")]
334 pub wake_target: Option<SessionScope>,
335 #[serde(default, skip_serializing_if = "Option::is_none")]
336 pub grant: Option<ProcessStartGrant>,
337 #[serde(default)]
338 pub event_types: Vec<ProcessEventType>,
339}
340
341impl ProcessStartRequest {
342 pub fn new(
343 id: impl Into<ProcessId>,
344 input: ProcessInput,
345 originator: ProcessOriginator,
346 ) -> Self {
347 Self {
348 id: id.into(),
349 input,
350 env_spec: None,
351 originator,
352 wake_target: None,
353 grant: None,
354 event_types: default_process_event_types(),
355 }
356 }
357
358 pub fn external(
359 id: impl Into<ProcessId>,
360 originator: ProcessOriginator,
361 metadata: serde_json::Value,
362 ) -> Self {
363 Self::new(id, ProcessInput::External { metadata }, originator)
364 }
365
366 pub fn with_env_spec(mut self, env_spec: ProcessExecutionEnvSpec) -> Self {
367 self.env_spec = Some(env_spec);
368 self
369 }
370
371 pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
372 self.wake_target = wake_target;
373 self
374 }
375
376 pub fn with_grant(mut self, grant: Option<ProcessStartGrant>) -> Self {
377 self.grant = grant;
378 self
379 }
380
381 pub fn with_event_types(
382 mut self,
383 event_types: impl IntoIterator<Item = ProcessEventType>,
384 ) -> Self {
385 self.event_types = event_types.into_iter().collect();
386 self
387 }
388
389 pub fn with_extra_event_types(
390 mut self,
391 event_types: impl IntoIterator<Item = ProcessEventType>,
392 ) -> Self {
393 self.event_types.extend(event_types);
394 self
395 }
396
397 pub fn into_registration(self, env_ref: Option<ProcessExecutionEnvRef>) -> ProcessRegistration {
398 ProcessRegistration::new(self.id, self.input, ProcessProvenance::new(self.originator))
399 .with_event_types(self.event_types)
400 .with_execution_env_ref(env_ref)
401 .with_wake_target(self.wake_target)
402 }
403}
404
405#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
406pub struct SessionScope {
407 pub session_id: String,
408 #[serde(default, skip_serializing_if = "Option::is_none")]
409 pub agent_frame_id: Option<crate::AgentFrameId>,
410}
411
412#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
413pub struct ProcessProvenance {
414 pub originator: ProcessOriginator,
415 #[serde(default, skip_serializing_if = "Option::is_none")]
416 pub caused_by: Option<crate::CausalRef>,
417}
418
419impl ProcessProvenance {
420 pub fn new(originator: ProcessOriginator) -> Self {
421 Self {
422 originator,
423 caused_by: None,
424 }
425 }
426
427 pub fn host() -> Self {
428 Self::new(ProcessOriginator::host())
429 }
430
431 pub fn session(scope: SessionScope) -> Self {
432 Self::new(ProcessOriginator::session(scope))
433 }
434
435 pub fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
436 self.caused_by = caused_by;
437 self
438 }
439}
440
441#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
442#[serde(tag = "type", rename_all = "snake_case")]
443pub enum ProcessOriginator {
444 Host,
445 Session { scope: SessionScope },
446}
447
448impl ProcessOriginator {
449 pub fn host() -> Self {
450 Self::Host
451 }
452
453 pub fn session(scope: SessionScope) -> Self {
454 Self::Session { scope }
455 }
456
457 pub fn scope_id(&self) -> String {
458 match self {
459 Self::Host => "host".to_string(),
460 Self::Session { scope } => scope.id().to_string(),
461 }
462 }
463}
464
465impl SessionScope {
466 pub fn new(session_id: impl Into<String>) -> Self {
467 Self {
468 session_id: session_id.into(),
469 agent_frame_id: None,
470 }
471 }
472
473 pub fn for_agent_frame(
474 session_id: impl Into<String>,
475 agent_frame_id: impl Into<crate::AgentFrameId>,
476 ) -> Self {
477 Self {
478 session_id: session_id.into(),
479 agent_frame_id: Some(agent_frame_id.into()),
480 }
481 }
482
483 pub fn id(&self) -> SessionScopeId {
484 match self.agent_frame_id.as_deref() {
485 Some(frame_id) if !frame_id.is_empty() => {
486 SessionScopeId::new(format!("session:{}/frame:{frame_id}", self.session_id))
487 }
488 _ => SessionScopeId::new(format!("session:{}", self.session_id)),
489 }
490 }
491
492 pub fn is_empty(&self) -> bool {
493 self.session_id.is_empty()
494 }
495}
496
497#[derive(Debug, Serialize, Deserialize)]
499pub struct ProcessRegistration {
500 pub id: ProcessId,
501 pub input: Arc<ProcessInput>,
502 #[serde(default)]
503 pub event_types: Vec<ProcessEventType>,
504 pub provenance: ProcessProvenance,
505 #[serde(default, skip_serializing_if = "Option::is_none")]
506 pub env_ref: Option<ProcessExecutionEnvRef>,
507 #[serde(default, skip_serializing_if = "Option::is_none")]
508 pub wake_target: Option<SessionScope>,
509}
510
511impl Clone for ProcessRegistration {
512 fn clone(&self) -> Self {
513 Self {
514 id: self.id.clone(),
515 input: Arc::clone(&self.input),
516 event_types: self.event_types.clone(),
517 provenance: self.provenance.clone(),
518 env_ref: self.env_ref.clone(),
519 wake_target: self.wake_target.clone(),
520 }
521 }
522}
523
524impl ProcessRegistration {
525 pub fn new(
526 id: impl Into<ProcessId>,
527 input: ProcessInput,
528 provenance: ProcessProvenance,
529 ) -> Self {
530 Self {
531 id: id.into(),
532 input: Arc::new(input),
533 event_types: default_process_event_types(),
534 provenance,
535 env_ref: None,
536 wake_target: None,
537 }
538 }
539
540 pub(crate) fn session_start_draft(id: impl Into<ProcessId>, input: ProcessInput) -> Self {
541 Self::new(id, input, ProcessProvenance::host())
542 }
543
544 pub fn with_process_provenance(mut self, provenance: ProcessProvenance) -> Self {
545 self.provenance = provenance;
546 self
547 }
548
549 pub fn with_execution_env_ref(mut self, env_ref: Option<ProcessExecutionEnvRef>) -> Self {
550 self.env_ref = env_ref;
551 self
552 }
553
554 pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
555 self.wake_target = wake_target;
556 self
557 }
558
559 pub fn with_event_types(
560 mut self,
561 event_types: impl IntoIterator<Item = ProcessEventType>,
562 ) -> Self {
563 self.event_types = event_types.into_iter().collect();
564 self
565 }
566
567 pub fn with_extra_event_types(
568 mut self,
569 event_types: impl IntoIterator<Item = ProcessEventType>,
570 ) -> Self {
571 self.event_types.extend(event_types);
572 self
573 }
574}
575
576#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
577#[serde(tag = "state", rename_all = "snake_case")]
578pub enum ProcessStatus {
579 #[default]
580 Running,
581 Completed {
582 await_output: ProcessAwaitOutput,
583 },
584 Failed {
585 await_output: ProcessAwaitOutput,
586 },
587 Cancelled {
588 await_output: ProcessAwaitOutput,
589 },
590}
591
592impl ProcessStatus {
593 pub fn from_terminal(terminal: ProcessTerminalSemantics) -> Self {
594 match terminal.state {
595 ProcessTerminalState::Completed => Self::Completed {
596 await_output: terminal.await_output,
597 },
598 ProcessTerminalState::Failed => Self::Failed {
599 await_output: terminal.await_output,
600 },
601 ProcessTerminalState::Cancelled => Self::Cancelled {
602 await_output: terminal.await_output,
603 },
604 }
605 }
606
607 pub fn is_terminal(&self) -> bool {
608 !matches!(self, Self::Running)
609 }
610
611 pub fn label(&self) -> &'static str {
612 match self {
613 Self::Running => "running",
614 Self::Completed { .. } => "completed",
615 Self::Failed { .. } => "failed",
616 Self::Cancelled { .. } => "cancelled",
617 }
618 }
619
620 pub fn terminal_state(&self) -> Option<ProcessTerminalState> {
621 match self {
622 Self::Running => None,
623 Self::Completed { .. } => Some(ProcessTerminalState::Completed),
624 Self::Failed { .. } => Some(ProcessTerminalState::Failed),
625 Self::Cancelled { .. } => Some(ProcessTerminalState::Cancelled),
626 }
627 }
628
629 pub fn await_output(&self) -> Option<&ProcessAwaitOutput> {
630 match self {
631 Self::Running => None,
632 Self::Completed { await_output }
633 | Self::Failed { await_output }
634 | Self::Cancelled { await_output } => Some(await_output),
635 }
636 }
637
638 pub fn terminal_semantics(&self) -> Option<ProcessTerminalSemantics> {
639 Some(ProcessTerminalSemantics {
640 state: self.terminal_state()?,
641 await_output: self.await_output()?.clone(),
642 })
643 }
644}
645
646#[derive(Clone, Debug, Serialize, Deserialize)]
649pub struct ProcessRecord {
650 pub id: ProcessId,
651 pub registration_hash: String,
652 pub input: Arc<ProcessInput>,
653 #[serde(default)]
654 pub event_types: Vec<ProcessEventType>,
655 pub provenance: ProcessProvenance,
656 #[serde(default, skip_serializing_if = "Option::is_none")]
657 pub env_ref: Option<ProcessExecutionEnvRef>,
658 #[serde(default, skip_serializing_if = "Option::is_none")]
659 pub wake_target: Option<SessionScope>,
660 #[serde(default)]
661 pub created_at_ms: u64,
662 #[serde(default)]
663 pub updated_at_ms: u64,
664 #[serde(default, skip_serializing_if = "Option::is_none")]
665 pub external_ref: Option<ProcessExternalRef>,
666 #[serde(default, skip_serializing_if = "Option::is_none")]
667 pub wait: Option<WaitState>,
668 #[serde(default)]
669 pub status: ProcessStatus,
670}
671
672#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
673pub struct WaitState {
674 pub kind: WaitKind,
675 pub since_ms: u64,
676}
677
678#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
679#[serde(tag = "kind", rename_all = "snake_case")]
680pub enum WaitKind {
681 Signal {
682 name: String,
683 event_type: String,
684 key: String,
685 ordinal: u64,
686 },
687}
688
689impl ProcessRecord {
690 pub fn from_registration(mut registration: ProcessRegistration) -> Self {
691 ensure_core_event_types(&mut registration);
692 validate_process_registration(®istration)
693 .expect("process registration should be valid before record construction");
694 let registration_hash = process_registration_hash(®istration)
695 .expect("process registration should hash before record construction");
696 Self::from_prepared_registration(registration, registration_hash, current_epoch_ms())
697 }
698
699 pub fn from_prepared_registration(
700 registration: ProcessRegistration,
701 registration_hash: String,
702 now_ms: u64,
703 ) -> Self {
704 Self {
705 id: registration.id,
706 registration_hash,
707 input: registration.input,
708 event_types: registration.event_types,
709 provenance: registration.provenance,
710 env_ref: registration.env_ref,
711 wake_target: registration.wake_target,
712 created_at_ms: now_ms,
713 updated_at_ms: now_ms,
714 external_ref: None,
715 wait: None,
716 status: ProcessStatus::Running,
717 }
718 }
719
720 pub fn is_terminal(&self) -> bool {
721 self.status.is_terminal()
722 }
723
724 pub fn originator_scope_id(&self) -> String {
725 self.provenance.originator.scope_id()
726 }
727}
728
729pub const PROCESS_LEASE_SCHEMA_VERSION: u32 = 1;
734
735#[derive(Clone, Debug, Serialize, Deserialize)]
749pub struct ProcessLease {
750 pub schema_version: u32,
751 pub process_id: ProcessId,
752 pub owner_id: String,
753 pub lease_token: String,
754 pub fencing_token: u64,
755 pub claimed_at_epoch_ms: u64,
756 pub expires_at_epoch_ms: u64,
757}
758
759#[derive(Clone, Debug, Serialize, Deserialize)]
760pub struct ProcessLeaseCompletion {
761 pub process_id: ProcessId,
762 pub lease_token: String,
763}
764
765impl ProcessLeaseCompletion {
766 pub fn from_lease(lease: &ProcessLease) -> Self {
767 Self {
768 process_id: lease.process_id.clone(),
769 lease_token: lease.lease_token.clone(),
770 }
771 }
772}
773
774#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
776pub struct ProcessExternalRef {
777 pub backend: String,
778 pub id: String,
779 #[serde(default, skip_serializing_if = "Option::is_none")]
780 pub metadata: Option<serde_json::Value>,
781}
782
783#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
784pub struct ProcessHandleDescriptor {
785 #[serde(default, skip_serializing_if = "Option::is_none")]
786 pub kind: Option<String>,
787 #[serde(default, skip_serializing_if = "Option::is_none")]
788 pub label: Option<String>,
789}
790
791impl ProcessHandleDescriptor {
792 pub fn new(kind: Option<impl Into<String>>, label: Option<impl Into<String>>) -> Self {
793 Self {
794 kind: kind.map(Into::into),
795 label: label.map(Into::into),
796 }
797 }
798}
799
800#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
801pub struct ProcessHandleGrant {
802 pub session_id: String,
803 pub process_id: ProcessId,
804 pub descriptor: ProcessHandleDescriptor,
805}
806
807pub type ProcessHandleGrantEntry = (ProcessHandleGrant, ProcessRecord);
808
809#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
810#[serde(rename_all = "snake_case")]
811pub enum ProcessLifecycleStatus {
812 #[default]
813 Running,
814 Completed,
815 Failed,
816 Cancelled,
817}
818
819impl ProcessLifecycleStatus {
820 pub fn label(self) -> &'static str {
821 match self {
822 Self::Running => "running",
823 Self::Completed => "completed",
824 Self::Failed => "failed",
825 Self::Cancelled => "cancelled",
826 }
827 }
828
829 pub fn is_terminal(self) -> bool {
830 !matches!(self, Self::Running)
831 }
832
833 pub fn terminal_state(self) -> Option<ProcessTerminalState> {
834 match self {
835 Self::Running => None,
836 Self::Completed => Some(ProcessTerminalState::Completed),
837 Self::Failed => Some(ProcessTerminalState::Failed),
838 Self::Cancelled => Some(ProcessTerminalState::Cancelled),
839 }
840 }
841}
842
843impl From<&ProcessStatus> for ProcessLifecycleStatus {
844 fn from(status: &ProcessStatus) -> Self {
845 match status {
846 ProcessStatus::Running => Self::Running,
847 ProcessStatus::Completed { .. } => Self::Completed,
848 ProcessStatus::Failed { .. } => Self::Failed,
849 ProcessStatus::Cancelled { .. } => Self::Cancelled,
850 }
851 }
852}
853
854impl From<ProcessStatus> for ProcessLifecycleStatus {
855 fn from(status: ProcessStatus) -> Self {
856 Self::from(&status)
857 }
858}
859
860#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
861pub struct ProcessHandleSummary {
862 #[serde(rename = "__handle__")]
863 pub handle_type: String,
864 pub id: ProcessId,
865 pub process_id: ProcessId,
866 pub descriptor: ProcessHandleDescriptor,
867 #[serde(default, skip_serializing_if = "Option::is_none")]
868 pub definition: Option<lashlang::ProcessDefinitionIdentity>,
869 pub status: ProcessLifecycleStatus,
870}
871
872impl ProcessHandleSummary {
873 pub fn new(
874 process_id: impl Into<ProcessId>,
875 descriptor: ProcessHandleDescriptor,
876 status: ProcessLifecycleStatus,
877 ) -> Self {
878 let process_id = process_id.into();
879 Self {
880 handle_type: "process".to_string(),
881 id: process_id.clone(),
882 process_id,
883 descriptor,
884 definition: None,
885 status,
886 }
887 }
888
889 pub fn with_definition(
890 mut self,
891 definition: Option<lashlang::ProcessDefinitionIdentity>,
892 ) -> Self {
893 self.definition = definition;
894 self
895 }
896
897 pub fn from_grant_record(grant: ProcessHandleGrant, record: ProcessRecord) -> Self {
898 let definition = process_definition_identity_from_input(record.input.as_ref());
899 Self::new(
900 record.id,
901 grant.descriptor,
902 ProcessLifecycleStatus::from(record.status),
903 )
904 .with_definition(definition)
905 }
906}
907
908impl From<ProcessHandleGrantEntry> for ProcessHandleSummary {
909 fn from((grant, record): ProcessHandleGrantEntry) -> Self {
910 Self::from_grant_record(grant, record)
911 }
912}
913
914#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
915pub struct ProcessCancelSummary {
916 pub process_id: ProcessId,
917 pub status: ProcessLifecycleStatus,
918}
919
920impl ProcessCancelSummary {
921 pub fn from_record(record: ProcessRecord) -> Self {
922 Self {
923 process_id: record.id,
924 status: ProcessLifecycleStatus::from(record.status),
925 }
926 }
927}
928
929fn process_definition_identity_from_input(
930 input: &ProcessInput,
931) -> Option<lashlang::ProcessDefinitionIdentity> {
932 match input {
933 ProcessInput::LashlangProcess {
934 module_ref,
935 process_ref,
936 host_requirements_ref,
937 process_name,
938 ..
939 } => Some(lashlang::ProcessDefinitionIdentity::new(
940 module_ref.clone(),
941 host_requirements_ref.clone(),
942 process_ref.clone(),
943 process_name.clone(),
944 )),
945 ProcessInput::ToolCall { .. }
946 | ProcessInput::SessionTurn { .. }
947 | ProcessInput::External { .. } => None,
948 }
949}
950
951#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
952pub enum ProcessStatusFilter {
953 #[default]
954 Running,
955 Completed,
956 Failed,
957 Cancelled,
958 Any,
959}
960
961impl ProcessStatusFilter {
962 pub fn decode(value: Option<&str>) -> Result<Self, String> {
963 match value.unwrap_or("running") {
964 "running" => Ok(Self::Running),
965 "completed" => Ok(Self::Completed),
966 "failed" => Ok(Self::Failed),
967 "cancelled" => Ok(Self::Cancelled),
968 "any" => Ok(Self::Any),
969 other => Err(format!(
970 "processes.list status must be `running`, `completed`, `failed`, `cancelled`, or `any`, got `{other}`"
971 )),
972 }
973 }
974
975 pub fn list_mode(self) -> ProcessListMode {
976 match self {
977 Self::Running => ProcessListMode::Live,
978 Self::Completed | Self::Failed | Self::Cancelled | Self::Any => ProcessListMode::All,
979 }
980 }
981
982 pub fn matches(self, status: ProcessLifecycleStatus) -> bool {
983 match self {
984 Self::Running => status == ProcessLifecycleStatus::Running,
985 Self::Completed => status == ProcessLifecycleStatus::Completed,
986 Self::Failed => status == ProcessLifecycleStatus::Failed,
987 Self::Cancelled => status == ProcessLifecycleStatus::Cancelled,
988 Self::Any => true,
989 }
990 }
991}
992
993#[derive(Clone, Debug, Default, PartialEq, Eq)]
994pub struct ProcessListFilter {
995 pub definition: Option<lashlang::ProcessDefinitionIdentity>,
996 pub status: ProcessStatusFilter,
997 pub waiting: Option<bool>,
998}
999
1000impl ProcessListFilter {
1001 pub fn decode(args: &serde_json::Value) -> Result<Self, String> {
1002 let map = args
1003 .as_object()
1004 .ok_or_else(|| "processes.list expects a record of process filters".to_string())?;
1005 for key in map.keys() {
1006 match key.as_str() {
1007 "definition" | "status" | "waiting" => {}
1008 _ => return Err(format!("processes.list unknown filter `{key}`")),
1009 }
1010 }
1011 let definition = args
1012 .get("definition")
1013 .map(lashlang::ProcessDefinitionIdentity::from_process_value)
1014 .transpose()
1015 .map_err(|err| err.to_string())?;
1016 let status =
1017 ProcessStatusFilter::decode(args.get("status").and_then(serde_json::Value::as_str))?;
1018 let waiting = args
1019 .get("waiting")
1020 .map(|value| {
1021 value
1022 .as_bool()
1023 .ok_or_else(|| "processes.list `waiting` filter must be a boolean".to_string())
1024 })
1025 .transpose()?;
1026 Ok(Self {
1027 definition,
1028 status,
1029 waiting,
1030 })
1031 }
1032
1033 pub fn list_mode(&self) -> ProcessListMode {
1034 self.status.list_mode()
1035 }
1036
1037 pub fn matches_entry(&self, entry: &ProcessHandleGrantEntry) -> bool {
1038 let (_grant, record) = entry;
1039 self.matches_record(record)
1040 }
1041
1042 pub fn matches_record(&self, record: &ProcessRecord) -> bool {
1043 let status = ProcessLifecycleStatus::from(&record.status);
1044 self.status.matches(status)
1045 && self
1046 .definition
1047 .as_ref()
1048 .is_none_or(|definition| match record.input.as_ref() {
1049 ProcessInput::LashlangProcess {
1050 module_ref,
1051 process_ref,
1052 host_requirements_ref,
1053 process_name,
1054 ..
1055 } => definition.matches_input_refs(
1056 module_ref,
1057 host_requirements_ref,
1058 process_ref,
1059 process_name,
1060 ),
1061 ProcessInput::ToolCall { .. }
1062 | ProcessInput::SessionTurn { .. }
1063 | ProcessInput::External { .. } => false,
1064 })
1065 && self
1066 .waiting
1067 .is_none_or(|waiting| record.wait.is_some() == waiting)
1068 }
1069}
1070
1071#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1072#[serde(rename_all = "snake_case")]
1073pub enum ProcessListMode {
1074 #[default]
1075 Live,
1076 All,
1077}
1078
1079impl ProcessListMode {
1080 pub fn as_str(self) -> &'static str {
1081 match self {
1082 Self::Live => "live",
1083 Self::All => "all",
1084 }
1085 }
1086}
1087
1088#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1089pub struct ProcessStartGrant {
1090 pub session_scope: SessionScope,
1091 pub descriptor: ProcessHandleDescriptor,
1092}
1093
1094#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1095pub struct ProcessSessionDeleteReport {
1096 pub session_id: String,
1097 pub revoked_handle_count: usize,
1098 pub deleted_wake_count: usize,
1099 pub orphaned_process_ids: Vec<String>,
1100 pub preserved_process_ids: Vec<String>,
1101}
1102
1103#[cfg(test)]
1104mod tests {
1105 use serde_json::json;
1106
1107 use super::*;
1108
1109 fn process_ref(component: &str, pos: usize) -> lashlang::ProcessRef {
1110 lashlang::ProcessRef {
1111 component: lashlang::ContentHash::new(component),
1112 pos: pos as u32,
1113 }
1114 }
1115
1116 fn process_value(
1117 module_ref: &lashlang::ModuleRef,
1118 host_requirements_ref: &lashlang::HostRequirementsRef,
1119 process_ref: &lashlang::ProcessRef,
1120 name: &str,
1121 ) -> serde_json::Value {
1122 let mut value = serde_json::Map::new();
1123 value.insert(lashlang::LASH_PROCESS_VALUE_KEY.to_string(), json!(true));
1124 value.insert(lashlang::LASH_MODULE_REF_KEY.to_string(), json!(module_ref));
1125 value.insert(
1126 lashlang::LASH_HOST_REQUIREMENTS_REF_KEY.to_string(),
1127 json!(host_requirements_ref),
1128 );
1129 value.insert(
1130 lashlang::LASH_PROCESS_REF_KEY.to_string(),
1131 json!(process_ref),
1132 );
1133 value.insert(lashlang::LASH_PROCESS_NAME_KEY.to_string(), json!(name));
1134 serde_json::Value::Object(value)
1135 }
1136
1137 fn lashlang_entry(
1138 process_id: &str,
1139 module_ref: lashlang::ModuleRef,
1140 host_requirements_ref: lashlang::HostRequirementsRef,
1141 process_ref: lashlang::ProcessRef,
1142 process_name: &str,
1143 status: ProcessStatus,
1144 ) -> ProcessHandleGrantEntry {
1145 let mut record = ProcessRecord::from_registration(
1146 ProcessRegistration::new(
1147 process_id,
1148 ProcessInput::LashlangProcess {
1149 module_ref,
1150 process_ref,
1151 host_requirements_ref: host_requirements_ref,
1152 process_name: process_name.to_string(),
1153 args: serde_json::Map::new(),
1154 },
1155 ProcessProvenance::host(),
1156 )
1157 .with_execution_env_ref(Some(ProcessExecutionEnvRef::new(format!(
1158 "process-env:test:{process_id}"
1159 )))),
1160 );
1161 record.status = status;
1162 (
1163 ProcessHandleGrant {
1164 session_id: "session".to_string(),
1165 process_id: process_id.to_string(),
1166 descriptor: ProcessHandleDescriptor::new(Some("lashlang"), Some(process_name)),
1167 },
1168 record,
1169 )
1170 }
1171
1172 #[test]
1173 fn process_list_filter_matches_definition_and_status() {
1174 let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
1175 let host_requirements_ref =
1176 lashlang::HostRequirementsRef::new(&lashlang::ContentHash::new("surface"));
1177 let target_ref = process_ref("target", 0);
1178 let other_ref = process_ref("other", 1);
1179 let filter = ProcessListFilter::decode(&json!({
1180 "definition": process_value(&module_ref, &host_requirements_ref, &target_ref, "target"),
1181 "status": "completed"
1182 }))
1183 .expect("decode filter");
1184
1185 let matching = lashlang_entry(
1186 "matching",
1187 module_ref.clone(),
1188 host_requirements_ref.clone(),
1189 target_ref,
1190 "target",
1191 ProcessStatus::Completed {
1192 await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1193 json!(true),
1194 )),
1195 },
1196 );
1197 let wrong_definition = lashlang_entry(
1198 "wrong-definition",
1199 module_ref,
1200 host_requirements_ref,
1201 other_ref,
1202 "other",
1203 ProcessStatus::Completed {
1204 await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1205 json!(true),
1206 )),
1207 },
1208 );
1209
1210 assert_eq!(filter.list_mode(), ProcessListMode::All);
1211 assert!(filter.matches_entry(&matching));
1212 assert!(!filter.matches_entry(&wrong_definition));
1213 }
1214
1215 #[test]
1216 fn process_list_filter_matches_waiting_facet() {
1217 let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
1218 let host_requirements_ref =
1219 lashlang::HostRequirementsRef::new(&lashlang::ContentHash::new("surface"));
1220 let process_ref = process_ref("target", 0);
1221 let mut waiting_entry = lashlang_entry(
1222 "waiting",
1223 module_ref.clone(),
1224 host_requirements_ref.clone(),
1225 process_ref.clone(),
1226 "target",
1227 ProcessStatus::Running,
1228 );
1229 waiting_entry.1.wait = Some(WaitState {
1230 since_ms: 42,
1231 kind: WaitKind::Signal {
1232 name: "ready".to_string(),
1233 event_type: "signal.ready".to_string(),
1234 key: "process:waiting:signal.ready:1".to_string(),
1235 ordinal: 1,
1236 },
1237 });
1238 let idle_entry = lashlang_entry(
1239 "idle",
1240 module_ref,
1241 host_requirements_ref,
1242 process_ref,
1243 "target",
1244 ProcessStatus::Running,
1245 );
1246 let waiting_filter =
1247 ProcessListFilter::decode(&json!({ "waiting": true })).expect("decode waiting filter");
1248 let idle_filter =
1249 ProcessListFilter::decode(&json!({ "waiting": false })).expect("decode idle filter");
1250
1251 assert_eq!(waiting_filter.list_mode(), ProcessListMode::Live);
1252 assert!(waiting_filter.matches_entry(&waiting_entry));
1253 assert!(!waiting_filter.matches_entry(&idle_entry));
1254 assert!(!idle_filter.matches_entry(&waiting_entry));
1255 assert!(idle_filter.matches_entry(&idle_entry));
1256 assert!(
1257 ProcessListFilter::decode(&json!({ "waiting": "yes" }))
1258 .expect_err("invalid waiting filter")
1259 .contains("must be a boolean")
1260 );
1261 }
1262}