1use std::fmt;
2use std::sync::Arc;
3
4use serde::{Deserialize, Serialize};
5
6use super::events::{
7 ProcessAwaitOutput, ProcessEventType, ProcessTerminalSemantics, ProcessTerminalState,
8 default_process_event_types,
9};
10use super::time::current_epoch_ms;
11use super::validation::{
12 ensure_core_event_types, process_registration_hash, validate_process_registration,
13};
14
15pub type ProcessId = String;
16
17#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
18#[serde(transparent)]
19pub struct SessionScopeId(String);
20
21impl SessionScopeId {
22 pub fn new(value: impl Into<String>) -> Self {
23 Self(value.into())
24 }
25
26 pub fn as_str(&self) -> &str {
27 &self.0
28 }
29}
30
31impl fmt::Display for SessionScopeId {
32 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
33 formatter.write_str(&self.0)
34 }
35}
36
37impl From<String> for SessionScopeId {
38 fn from(value: String) -> Self {
39 Self::new(value)
40 }
41}
42
43impl From<&str> for SessionScopeId {
44 fn from(value: &str) -> Self {
45 Self::new(value)
46 }
47}
48
49#[derive(Debug, Serialize, Deserialize)]
51#[serde(tag = "type", rename_all = "snake_case")]
52pub enum ProcessInput {
53 ToolCall {
54 call: crate::PreparedToolCall,
55 },
56 LashlangProcess {
57 module_ref: lashlang::ModuleRef,
58 process_ref: lashlang::ProcessRef,
59 required_surface_ref: lashlang::RequiredSurfaceRef,
60 process_name: String,
61 #[serde(default)]
62 args: serde_json::Map<String, serde_json::Value>,
63 },
64 SessionTurn {
65 create_request: Box<crate::SessionCreateRequest>,
66 turn_input: Box<crate::TurnInput>,
67 output_contract: crate::ToolOutputContract,
68 },
69 External {
70 #[serde(default)]
71 metadata: serde_json::Value,
72 },
73}
74
75impl Clone for ProcessInput {
76 fn clone(&self) -> Self {
77 match self {
78 Self::ToolCall { call } => Self::ToolCall { call: call.clone() },
79 Self::LashlangProcess {
80 module_ref,
81 process_ref,
82 required_surface_ref,
83 process_name,
84 args,
85 } => Self::LashlangProcess {
86 module_ref: module_ref.clone(),
87 process_ref: process_ref.clone(),
88 required_surface_ref: required_surface_ref.clone(),
89 process_name: process_name.clone(),
90 args: args.clone(),
91 },
92 Self::SessionTurn {
93 create_request,
94 turn_input,
95 output_contract,
96 } => Self::SessionTurn {
97 create_request: create_request.clone(),
98 turn_input: turn_input.clone(),
99 output_contract: output_contract.clone(),
100 },
101 Self::External { metadata } => Self::External {
102 metadata: metadata.clone(),
103 },
104 }
105 }
106}
107
108#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
109#[serde(transparent)]
110pub struct ProcessExecutionEnvRef(String);
111
112impl ProcessExecutionEnvRef {
113 pub fn new(value: impl Into<String>) -> Self {
114 Self(value.into())
115 }
116
117 pub fn as_str(&self) -> &str {
118 &self.0
119 }
120}
121
122impl fmt::Display for ProcessExecutionEnvRef {
123 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
124 formatter.write_str(&self.0)
125 }
126}
127
128#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
129pub struct ProcessExecutionEnvSpec {
130 #[serde(default)]
131 pub plugin_options: crate::PluginOptions,
132 #[serde(default)]
133 pub policy: crate::SessionPolicy,
134}
135
136impl ProcessExecutionEnvSpec {
137 pub fn new(plugin_options: crate::PluginOptions, policy: crate::SessionPolicy) -> Self {
138 Self {
139 plugin_options,
140 policy,
141 }
142 }
143
144 pub fn stable_ref(&self) -> Result<ProcessExecutionEnvRef, serde_json::Error> {
145 crate::stable_hash::stable_json_sha256_hex(self)
146 .map(|hash| ProcessExecutionEnvRef::new(format!("process-env:sha256:{hash}")))
147 }
148
149 pub fn to_store_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
150 serde_json::to_vec(self)
151 }
152
153 pub fn from_store_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
154 serde_json::from_slice(bytes)
155 }
156}
157
158pub async fn persist_process_execution_env(
159 artifact_store: &dyn lashlang::LashlangArtifactStore,
160 spec: &ProcessExecutionEnvSpec,
161) -> Result<ProcessExecutionEnvRef, crate::PluginError> {
162 let env_ref = spec.stable_ref().map_err(|err| {
163 crate::PluginError::Session(format!("failed to hash process execution env: {err}"))
164 })?;
165 let bytes = spec.to_store_bytes().map_err(|err| {
166 crate::PluginError::Session(format!("failed to encode process execution env: {err}"))
167 })?;
168 artifact_store
169 .put_artifact_bytes(env_ref.as_str(), "process_execution_env", &bytes)
170 .await
171 .map_err(|err| {
172 crate::PluginError::Session(format!(
173 "failed to persist process execution env `{env_ref}`: {err}"
174 ))
175 })?;
176 Ok(env_ref)
177}
178
179pub async fn load_process_execution_env(
180 artifact_store: &dyn lashlang::LashlangArtifactStore,
181 env_ref: &ProcessExecutionEnvRef,
182) -> Result<ProcessExecutionEnvSpec, crate::PluginError> {
183 let bytes = artifact_store
184 .get_artifact_bytes(env_ref.as_str())
185 .await
186 .map_err(|err| {
187 crate::PluginError::Session(format!(
188 "failed to load process execution env `{env_ref}`: {err}"
189 ))
190 })?
191 .ok_or_else(|| {
192 crate::PluginError::Session(format!("missing process execution env `{env_ref}`"))
193 })?;
194 ProcessExecutionEnvSpec::from_store_bytes(&bytes).map_err(|err| {
195 crate::PluginError::Session(format!(
196 "failed to decode process execution env `{env_ref}`: {err}"
197 ))
198 })
199}
200
201#[derive(Clone, Debug, Default, Serialize, Deserialize)]
204pub struct ProcessExecutionContext {
205 #[serde(default, skip_serializing_if = "Option::is_none")]
206 pub causal_invocation: Option<crate::RuntimeInvocation>,
207}
208
209impl ProcessExecutionContext {
210 pub fn with_causal_invocation(mut self, invocation: Option<crate::RuntimeInvocation>) -> Self {
211 self.causal_invocation = invocation;
212 self
213 }
214
215 pub fn is_empty(&self) -> bool {
216 self.causal_invocation.is_none()
217 }
218}
219
220#[derive(Clone)]
221pub struct ProcessOpScope<'scope> {
222 pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
223 pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'scope>,
224 pub(crate) agent_frame_id: Option<crate::AgentFrameId>,
225 pub(crate) target_agent_frame_id: Option<crate::AgentFrameId>,
226}
227
228impl<'scope> ProcessOpScope<'scope> {
229 pub fn new(scoped_effect_controller: crate::ScopedEffectController<'scope>) -> Self {
230 Self {
231 parent_invocation: None,
232 effect_controller: crate::runtime::RuntimeEffectControllerHandle::borrowed(
233 scoped_effect_controller,
234 ),
235 agent_frame_id: None,
236 target_agent_frame_id: None,
237 }
238 }
239
240 pub fn with_parent_invocation(
241 mut self,
242 parent_invocation: Option<crate::RuntimeInvocation>,
243 ) -> Self {
244 self.parent_invocation = parent_invocation;
245 self
246 }
247
248 pub fn with_agent_frame_id(mut self, agent_frame_id: Option<crate::AgentFrameId>) -> Self {
249 self.agent_frame_id = agent_frame_id;
250 self
251 }
252
253 pub fn with_target_agent_frame_id(
254 mut self,
255 agent_frame_id: Option<crate::AgentFrameId>,
256 ) -> Self {
257 self.target_agent_frame_id = agent_frame_id;
258 self
259 }
260
261 pub fn agent_frame_id(&self) -> Option<&str> {
262 self.agent_frame_id.as_deref()
263 }
264
265 pub fn target_agent_frame_id(&self) -> Option<&str> {
266 self.target_agent_frame_id.as_deref()
267 }
268
269 pub(crate) fn controller(&self) -> &dyn crate::RuntimeEffectController {
270 self.effect_controller.controller()
271 }
272}
273
274#[derive(Clone, Debug, Default)]
275pub struct ProcessStartOptions {
276 pub descriptor: Option<ProcessHandleDescriptor>,
277}
278
279impl ProcessStartOptions {
280 pub fn new() -> Self {
281 Self::default()
282 }
283
284 pub fn with_descriptor(mut self, descriptor: ProcessHandleDescriptor) -> Self {
285 self.descriptor = Some(descriptor);
286 self
287 }
288
289 pub fn with_optional_descriptor(mut self, descriptor: Option<ProcessHandleDescriptor>) -> Self {
290 self.descriptor = descriptor;
291 self
292 }
293
294 pub fn execution_context(&self, scope: &ProcessOpScope<'_>) -> ProcessExecutionContext {
295 ProcessExecutionContext {
296 causal_invocation: scope.parent_invocation.clone(),
297 }
298 }
299}
300
301#[derive(Clone, Debug, Serialize, Deserialize)]
303pub struct ProcessStartRequest {
304 pub id: ProcessId,
305 pub input: ProcessInput,
306 #[serde(default, skip_serializing_if = "Option::is_none")]
307 pub env_spec: Option<ProcessExecutionEnvSpec>,
308 pub originator: ProcessOriginator,
309 #[serde(default, skip_serializing_if = "Option::is_none")]
310 pub wake_target: Option<SessionScope>,
311 #[serde(default, skip_serializing_if = "Option::is_none")]
312 pub grant: Option<ProcessStartGrant>,
313 #[serde(default)]
314 pub event_types: Vec<ProcessEventType>,
315}
316
317impl ProcessStartRequest {
318 pub fn new(
319 id: impl Into<ProcessId>,
320 input: ProcessInput,
321 originator: ProcessOriginator,
322 ) -> Self {
323 Self {
324 id: id.into(),
325 input,
326 env_spec: None,
327 originator,
328 wake_target: None,
329 grant: None,
330 event_types: default_process_event_types(),
331 }
332 }
333
334 pub fn external(
335 id: impl Into<ProcessId>,
336 originator: ProcessOriginator,
337 metadata: serde_json::Value,
338 ) -> Self {
339 Self::new(id, ProcessInput::External { metadata }, originator)
340 }
341
342 pub fn with_env_spec(mut self, env_spec: ProcessExecutionEnvSpec) -> Self {
343 self.env_spec = Some(env_spec);
344 self
345 }
346
347 pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
348 self.wake_target = wake_target;
349 self
350 }
351
352 pub fn with_grant(mut self, grant: Option<ProcessStartGrant>) -> Self {
353 self.grant = grant;
354 self
355 }
356
357 pub fn with_event_types(
358 mut self,
359 event_types: impl IntoIterator<Item = ProcessEventType>,
360 ) -> Self {
361 self.event_types = event_types.into_iter().collect();
362 self
363 }
364
365 pub fn with_extra_event_types(
366 mut self,
367 event_types: impl IntoIterator<Item = ProcessEventType>,
368 ) -> Self {
369 self.event_types.extend(event_types);
370 self
371 }
372
373 pub fn into_registration(
374 self,
375 host_profile_id: impl Into<String>,
376 env_ref: Option<ProcessExecutionEnvRef>,
377 ) -> ProcessRegistration {
378 ProcessRegistration::new(
379 self.id,
380 self.input,
381 ProcessProvenance::new(self.originator, host_profile_id),
382 )
383 .with_event_types(self.event_types)
384 .with_execution_env_ref(env_ref)
385 .with_wake_target(self.wake_target)
386 }
387}
388
389#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
390pub struct SessionScope {
391 pub session_id: String,
392 #[serde(default, skip_serializing_if = "Option::is_none")]
393 pub agent_frame_id: Option<crate::AgentFrameId>,
394}
395
396#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
397pub struct ProcessProvenance {
398 pub originator: ProcessOriginator,
399 pub host_profile_id: String,
400 #[serde(default, skip_serializing_if = "Option::is_none")]
401 pub caused_by: Option<crate::CausalRef>,
402}
403
404impl ProcessProvenance {
405 pub fn new(originator: ProcessOriginator, host_profile_id: impl Into<String>) -> Self {
406 Self {
407 originator,
408 host_profile_id: host_profile_id.into(),
409 caused_by: None,
410 }
411 }
412
413 pub fn host(host_profile_id: impl Into<String>) -> Self {
414 Self::new(ProcessOriginator::host(), host_profile_id)
415 }
416
417 pub fn session(scope: SessionScope, host_profile_id: impl Into<String>) -> Self {
418 Self::new(ProcessOriginator::session(scope), host_profile_id)
419 }
420
421 pub fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
422 self.caused_by = caused_by;
423 self
424 }
425}
426
427#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
428#[serde(tag = "type", rename_all = "snake_case")]
429pub enum ProcessOriginator {
430 Host,
431 Session { scope: SessionScope },
432}
433
434impl ProcessOriginator {
435 pub fn host() -> Self {
436 Self::Host
437 }
438
439 pub fn session(scope: SessionScope) -> Self {
440 Self::Session { scope }
441 }
442
443 pub fn scope_id(&self) -> String {
444 match self {
445 Self::Host => "host".to_string(),
446 Self::Session { scope } => scope.id().to_string(),
447 }
448 }
449}
450
451impl SessionScope {
452 pub fn new(session_id: impl Into<String>) -> Self {
453 Self {
454 session_id: session_id.into(),
455 agent_frame_id: None,
456 }
457 }
458
459 pub fn for_agent_frame(
460 session_id: impl Into<String>,
461 agent_frame_id: impl Into<crate::AgentFrameId>,
462 ) -> Self {
463 Self {
464 session_id: session_id.into(),
465 agent_frame_id: Some(agent_frame_id.into()),
466 }
467 }
468
469 pub fn id(&self) -> SessionScopeId {
470 match self.agent_frame_id.as_deref() {
471 Some(frame_id) if !frame_id.is_empty() => {
472 SessionScopeId::new(format!("session:{}/frame:{frame_id}", self.session_id))
473 }
474 _ => SessionScopeId::new(format!("session:{}", self.session_id)),
475 }
476 }
477
478 pub fn is_empty(&self) -> bool {
479 self.session_id.is_empty()
480 }
481}
482
483#[derive(Debug, Serialize, Deserialize)]
485pub struct ProcessRegistration {
486 pub id: ProcessId,
487 pub input: Arc<ProcessInput>,
488 #[serde(default)]
489 pub event_types: Vec<ProcessEventType>,
490 pub provenance: ProcessProvenance,
491 #[serde(default, skip_serializing_if = "Option::is_none")]
492 pub env_ref: Option<ProcessExecutionEnvRef>,
493 #[serde(default, skip_serializing_if = "Option::is_none")]
494 pub wake_target: Option<SessionScope>,
495}
496
497impl Clone for ProcessRegistration {
498 fn clone(&self) -> Self {
499 Self {
500 id: self.id.clone(),
501 input: Arc::clone(&self.input),
502 event_types: self.event_types.clone(),
503 provenance: self.provenance.clone(),
504 env_ref: self.env_ref.clone(),
505 wake_target: self.wake_target.clone(),
506 }
507 }
508}
509
510impl ProcessRegistration {
511 pub fn new(
512 id: impl Into<ProcessId>,
513 input: ProcessInput,
514 provenance: ProcessProvenance,
515 ) -> Self {
516 Self {
517 id: id.into(),
518 input: Arc::new(input),
519 event_types: default_process_event_types(),
520 provenance,
521 env_ref: None,
522 wake_target: None,
523 }
524 }
525
526 pub(crate) fn session_start_draft(id: impl Into<ProcessId>, input: ProcessInput) -> Self {
527 Self::new(id, input, ProcessProvenance::host("session-start-draft"))
528 }
529
530 pub fn with_process_provenance(mut self, provenance: ProcessProvenance) -> Self {
531 self.provenance = provenance;
532 self
533 }
534
535 pub fn with_execution_env_ref(mut self, env_ref: Option<ProcessExecutionEnvRef>) -> Self {
536 self.env_ref = env_ref;
537 self
538 }
539
540 pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
541 self.wake_target = wake_target;
542 self
543 }
544
545 pub fn with_event_types(
546 mut self,
547 event_types: impl IntoIterator<Item = ProcessEventType>,
548 ) -> Self {
549 self.event_types = event_types.into_iter().collect();
550 self
551 }
552
553 pub fn with_extra_event_types(
554 mut self,
555 event_types: impl IntoIterator<Item = ProcessEventType>,
556 ) -> Self {
557 self.event_types.extend(event_types);
558 self
559 }
560}
561
562#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
563#[serde(tag = "state", rename_all = "snake_case")]
564pub enum ProcessStatus {
565 #[default]
566 Running,
567 Completed {
568 await_output: ProcessAwaitOutput,
569 },
570 Failed {
571 await_output: ProcessAwaitOutput,
572 },
573 Cancelled {
574 await_output: ProcessAwaitOutput,
575 },
576}
577
578impl ProcessStatus {
579 pub fn from_terminal(terminal: ProcessTerminalSemantics) -> Self {
580 match terminal.state {
581 ProcessTerminalState::Completed => Self::Completed {
582 await_output: terminal.await_output,
583 },
584 ProcessTerminalState::Failed => Self::Failed {
585 await_output: terminal.await_output,
586 },
587 ProcessTerminalState::Cancelled => Self::Cancelled {
588 await_output: terminal.await_output,
589 },
590 }
591 }
592
593 pub fn is_terminal(&self) -> bool {
594 !matches!(self, Self::Running)
595 }
596
597 pub fn label(&self) -> &'static str {
598 match self {
599 Self::Running => "running",
600 Self::Completed { .. } => "completed",
601 Self::Failed { .. } => "failed",
602 Self::Cancelled { .. } => "cancelled",
603 }
604 }
605
606 pub fn terminal_state(&self) -> Option<ProcessTerminalState> {
607 match self {
608 Self::Running => None,
609 Self::Completed { .. } => Some(ProcessTerminalState::Completed),
610 Self::Failed { .. } => Some(ProcessTerminalState::Failed),
611 Self::Cancelled { .. } => Some(ProcessTerminalState::Cancelled),
612 }
613 }
614
615 pub fn await_output(&self) -> Option<&ProcessAwaitOutput> {
616 match self {
617 Self::Running => None,
618 Self::Completed { await_output }
619 | Self::Failed { await_output }
620 | Self::Cancelled { await_output } => Some(await_output),
621 }
622 }
623
624 pub fn terminal_semantics(&self) -> Option<ProcessTerminalSemantics> {
625 Some(ProcessTerminalSemantics {
626 state: self.terminal_state()?,
627 await_output: self.await_output()?.clone(),
628 })
629 }
630}
631
632#[derive(Clone, Debug, Serialize, Deserialize)]
635pub struct ProcessRecord {
636 pub id: ProcessId,
637 pub registration_hash: String,
638 pub input: Arc<ProcessInput>,
639 #[serde(default)]
640 pub event_types: Vec<ProcessEventType>,
641 pub provenance: ProcessProvenance,
642 #[serde(default, skip_serializing_if = "Option::is_none")]
643 pub env_ref: Option<ProcessExecutionEnvRef>,
644 #[serde(default, skip_serializing_if = "Option::is_none")]
645 pub wake_target: Option<SessionScope>,
646 #[serde(default)]
647 pub created_at_ms: u64,
648 #[serde(default)]
649 pub updated_at_ms: u64,
650 #[serde(default, skip_serializing_if = "Option::is_none")]
651 pub external_ref: Option<ProcessExternalRef>,
652 #[serde(default, skip_serializing_if = "Option::is_none")]
653 pub wait: Option<WaitState>,
654 #[serde(default)]
655 pub status: ProcessStatus,
656}
657
658#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
659pub struct WaitState {
660 pub kind: WaitKind,
661 pub since_ms: u64,
662}
663
664#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
665#[serde(tag = "kind", rename_all = "snake_case")]
666pub enum WaitKind {
667 Signal {
668 name: String,
669 event_type: String,
670 key: String,
671 ordinal: u64,
672 },
673}
674
675impl ProcessRecord {
676 pub fn from_registration(mut registration: ProcessRegistration) -> Self {
677 ensure_core_event_types(&mut registration);
678 validate_process_registration(®istration)
679 .expect("process registration should be valid before record construction");
680 let registration_hash = process_registration_hash(®istration)
681 .expect("process registration should hash before record construction");
682 Self::from_prepared_registration(registration, registration_hash, current_epoch_ms())
683 }
684
685 pub fn from_prepared_registration(
686 registration: ProcessRegistration,
687 registration_hash: String,
688 now_ms: u64,
689 ) -> Self {
690 Self {
691 id: registration.id,
692 registration_hash,
693 input: registration.input,
694 event_types: registration.event_types,
695 provenance: registration.provenance,
696 env_ref: registration.env_ref,
697 wake_target: registration.wake_target,
698 created_at_ms: now_ms,
699 updated_at_ms: now_ms,
700 external_ref: None,
701 wait: None,
702 status: ProcessStatus::Running,
703 }
704 }
705
706 pub fn is_terminal(&self) -> bool {
707 self.status.is_terminal()
708 }
709
710 pub fn originator_scope_id(&self) -> String {
711 self.provenance.originator.scope_id()
712 }
713
714 pub fn host_profile_id(&self) -> &str {
715 &self.provenance.host_profile_id
716 }
717}
718
719pub const PROCESS_LEASE_SCHEMA_VERSION: u32 = 1;
724
725#[derive(Clone, Debug, Serialize, Deserialize)]
739pub struct ProcessLease {
740 pub schema_version: u32,
741 pub process_id: ProcessId,
742 pub owner_id: String,
743 pub lease_token: String,
744 pub fencing_token: u64,
745 pub claimed_at_epoch_ms: u64,
746 pub expires_at_epoch_ms: u64,
747}
748
749#[derive(Clone, Debug, Serialize, Deserialize)]
750pub struct ProcessLeaseCompletion {
751 pub process_id: ProcessId,
752 pub lease_token: String,
753}
754
755impl ProcessLeaseCompletion {
756 pub fn from_lease(lease: &ProcessLease) -> Self {
757 Self {
758 process_id: lease.process_id.clone(),
759 lease_token: lease.lease_token.clone(),
760 }
761 }
762}
763
764#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
766pub struct ProcessExternalRef {
767 pub backend: String,
768 pub id: String,
769 #[serde(default, skip_serializing_if = "Option::is_none")]
770 pub metadata: Option<serde_json::Value>,
771}
772
773#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
774pub struct ProcessHandleDescriptor {
775 #[serde(default, skip_serializing_if = "Option::is_none")]
776 pub kind: Option<String>,
777 #[serde(default, skip_serializing_if = "Option::is_none")]
778 pub label: Option<String>,
779}
780
781impl ProcessHandleDescriptor {
782 pub fn new(kind: Option<impl Into<String>>, label: Option<impl Into<String>>) -> Self {
783 Self {
784 kind: kind.map(Into::into),
785 label: label.map(Into::into),
786 }
787 }
788}
789
790#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
791pub struct ProcessHandleGrant {
792 pub session_id: String,
793 pub process_id: ProcessId,
794 pub descriptor: ProcessHandleDescriptor,
795}
796
797pub type ProcessHandleGrantEntry = (ProcessHandleGrant, ProcessRecord);
798
799#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
800#[serde(rename_all = "snake_case")]
801pub enum ProcessLifecycleStatus {
802 #[default]
803 Running,
804 Completed,
805 Failed,
806 Cancelled,
807}
808
809impl ProcessLifecycleStatus {
810 pub fn label(self) -> &'static str {
811 match self {
812 Self::Running => "running",
813 Self::Completed => "completed",
814 Self::Failed => "failed",
815 Self::Cancelled => "cancelled",
816 }
817 }
818
819 pub fn is_terminal(self) -> bool {
820 !matches!(self, Self::Running)
821 }
822
823 pub fn terminal_state(self) -> Option<ProcessTerminalState> {
824 match self {
825 Self::Running => None,
826 Self::Completed => Some(ProcessTerminalState::Completed),
827 Self::Failed => Some(ProcessTerminalState::Failed),
828 Self::Cancelled => Some(ProcessTerminalState::Cancelled),
829 }
830 }
831}
832
833impl From<&ProcessStatus> for ProcessLifecycleStatus {
834 fn from(status: &ProcessStatus) -> Self {
835 match status {
836 ProcessStatus::Running => Self::Running,
837 ProcessStatus::Completed { .. } => Self::Completed,
838 ProcessStatus::Failed { .. } => Self::Failed,
839 ProcessStatus::Cancelled { .. } => Self::Cancelled,
840 }
841 }
842}
843
844impl From<ProcessStatus> for ProcessLifecycleStatus {
845 fn from(status: ProcessStatus) -> Self {
846 Self::from(&status)
847 }
848}
849
850#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
851pub struct ProcessHandleSummary {
852 #[serde(rename = "__handle__")]
853 pub handle_type: String,
854 pub id: ProcessId,
855 pub process_id: ProcessId,
856 pub descriptor: ProcessHandleDescriptor,
857 #[serde(default, skip_serializing_if = "Option::is_none")]
858 pub definition: Option<ProcessDefinitionSummary>,
859 pub status: ProcessLifecycleStatus,
860}
861
862impl ProcessHandleSummary {
863 pub fn new(
864 process_id: impl Into<ProcessId>,
865 descriptor: ProcessHandleDescriptor,
866 status: ProcessLifecycleStatus,
867 ) -> Self {
868 let process_id = process_id.into();
869 Self {
870 handle_type: "process".to_string(),
871 id: process_id.clone(),
872 process_id,
873 descriptor,
874 definition: None,
875 status,
876 }
877 }
878
879 pub fn with_definition(mut self, definition: Option<ProcessDefinitionSummary>) -> Self {
880 self.definition = definition;
881 self
882 }
883
884 pub fn from_grant_record(grant: ProcessHandleGrant, record: ProcessRecord) -> Self {
885 let definition = ProcessDefinitionSummary::from_input(record.input.as_ref());
886 Self::new(
887 record.id,
888 grant.descriptor,
889 ProcessLifecycleStatus::from(record.status),
890 )
891 .with_definition(definition)
892 }
893}
894
895impl From<ProcessHandleGrantEntry> for ProcessHandleSummary {
896 fn from((grant, record): ProcessHandleGrantEntry) -> Self {
897 Self::from_grant_record(grant, record)
898 }
899}
900
901#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
902pub struct ProcessCancelSummary {
903 pub process_id: ProcessId,
904 pub status: ProcessLifecycleStatus,
905}
906
907impl ProcessCancelSummary {
908 pub fn from_record(record: ProcessRecord) -> Self {
909 Self {
910 process_id: record.id,
911 status: ProcessLifecycleStatus::from(record.status),
912 }
913 }
914}
915
916#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
917pub struct ProcessDefinitionSummary {
918 pub name: String,
919}
920
921impl ProcessDefinitionSummary {
922 pub fn from_input(input: &ProcessInput) -> Option<Self> {
923 match input {
924 ProcessInput::LashlangProcess { process_name, .. } => Some(Self {
925 name: process_name.clone(),
926 }),
927 ProcessInput::ToolCall { .. }
928 | ProcessInput::SessionTurn { .. }
929 | ProcessInput::External { .. } => None,
930 }
931 }
932}
933
934#[derive(Clone, Debug, PartialEq, Eq)]
935pub struct ProcessDefinitionSelector {
936 module_ref: lashlang::ModuleRef,
937 required_surface_ref: lashlang::RequiredSurfaceRef,
938 process_ref: lashlang::ProcessRef,
939 process_name: String,
940}
941
942impl ProcessDefinitionSelector {
943 pub fn decode(value: &serde_json::Value) -> Result<Self, String> {
944 if value
945 .get(lashlang::LASH_PROCESS_VALUE_KEY)
946 .and_then(serde_json::Value::as_bool)
947 != Some(true)
948 {
949 return Err("definition must be a process definition value".to_string());
950 }
951 Ok(Self {
952 module_ref: decode_process_definition_field(
953 value,
954 lashlang::LASH_MODULE_REF_KEY,
955 "definition",
956 )?,
957 required_surface_ref: decode_process_definition_field(
958 value,
959 lashlang::LASH_REQUIRED_SURFACE_REF_KEY,
960 "definition",
961 )?,
962 process_ref: decode_process_definition_field(
963 value,
964 lashlang::LASH_PROCESS_REF_KEY,
965 "definition",
966 )?,
967 process_name: value
968 .get(lashlang::LASH_PROCESS_NAME_KEY)
969 .and_then(serde_json::Value::as_str)
970 .ok_or_else(|| "definition is missing its process name".to_string())?
971 .to_string(),
972 })
973 }
974
975 pub fn matches_input(&self, input: &ProcessInput) -> bool {
976 match input {
977 ProcessInput::LashlangProcess {
978 module_ref,
979 process_ref,
980 required_surface_ref,
981 process_name,
982 ..
983 } => {
984 self.module_ref == *module_ref
985 && self.required_surface_ref == *required_surface_ref
986 && self.process_ref == *process_ref
987 && self.process_name == *process_name
988 }
989 ProcessInput::ToolCall { .. }
990 | ProcessInput::SessionTurn { .. }
991 | ProcessInput::External { .. } => false,
992 }
993 }
994}
995
996fn decode_process_definition_field<T: serde::de::DeserializeOwned>(
997 value: &serde_json::Value,
998 field: &'static str,
999 label: &'static str,
1000) -> Result<T, String> {
1001 serde_json::from_value(
1002 value
1003 .get(field)
1004 .cloned()
1005 .ok_or_else(|| format!("{label} is missing {field}"))?,
1006 )
1007 .map_err(|err| format!("{label} has invalid {field}: {err}"))
1008}
1009
1010#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
1011pub enum ProcessStatusFilter {
1012 #[default]
1013 Running,
1014 Completed,
1015 Failed,
1016 Cancelled,
1017 Any,
1018}
1019
1020impl ProcessStatusFilter {
1021 pub fn decode(value: Option<&str>) -> Result<Self, String> {
1022 match value.unwrap_or("running") {
1023 "running" => Ok(Self::Running),
1024 "completed" => Ok(Self::Completed),
1025 "failed" => Ok(Self::Failed),
1026 "cancelled" => Ok(Self::Cancelled),
1027 "any" => Ok(Self::Any),
1028 other => Err(format!(
1029 "processes.list status must be `running`, `completed`, `failed`, `cancelled`, or `any`, got `{other}`"
1030 )),
1031 }
1032 }
1033
1034 pub fn list_mode(self) -> ProcessListMode {
1035 match self {
1036 Self::Running => ProcessListMode::Live,
1037 Self::Completed | Self::Failed | Self::Cancelled | Self::Any => ProcessListMode::All,
1038 }
1039 }
1040
1041 pub fn matches(self, status: ProcessLifecycleStatus) -> bool {
1042 match self {
1043 Self::Running => status == ProcessLifecycleStatus::Running,
1044 Self::Completed => status == ProcessLifecycleStatus::Completed,
1045 Self::Failed => status == ProcessLifecycleStatus::Failed,
1046 Self::Cancelled => status == ProcessLifecycleStatus::Cancelled,
1047 Self::Any => true,
1048 }
1049 }
1050}
1051
1052#[derive(Clone, Debug, Default, PartialEq, Eq)]
1053pub struct ProcessListFilter {
1054 pub definition: Option<ProcessDefinitionSelector>,
1055 pub status: ProcessStatusFilter,
1056 pub waiting: Option<bool>,
1057}
1058
1059impl ProcessListFilter {
1060 pub fn decode(args: &serde_json::Value) -> Result<Self, String> {
1061 let map = args
1062 .as_object()
1063 .ok_or_else(|| "processes.list expects a record of process filters".to_string())?;
1064 for key in map.keys() {
1065 match key.as_str() {
1066 "definition" | "status" | "waiting" => {}
1067 _ => return Err(format!("processes.list unknown filter `{key}`")),
1068 }
1069 }
1070 let definition = args
1071 .get("definition")
1072 .map(ProcessDefinitionSelector::decode)
1073 .transpose()?;
1074 let status =
1075 ProcessStatusFilter::decode(args.get("status").and_then(serde_json::Value::as_str))?;
1076 let waiting = args
1077 .get("waiting")
1078 .map(|value| {
1079 value
1080 .as_bool()
1081 .ok_or_else(|| "processes.list `waiting` filter must be a boolean".to_string())
1082 })
1083 .transpose()?;
1084 Ok(Self {
1085 definition,
1086 status,
1087 waiting,
1088 })
1089 }
1090
1091 pub fn list_mode(&self) -> ProcessListMode {
1092 self.status.list_mode()
1093 }
1094
1095 pub fn matches_entry(&self, entry: &ProcessHandleGrantEntry) -> bool {
1096 let (_grant, record) = entry;
1097 self.matches_record(record)
1098 }
1099
1100 pub fn matches_record(&self, record: &ProcessRecord) -> bool {
1101 let status = ProcessLifecycleStatus::from(&record.status);
1102 self.status.matches(status)
1103 && self
1104 .definition
1105 .as_ref()
1106 .is_none_or(|definition| definition.matches_input(record.input.as_ref()))
1107 && self
1108 .waiting
1109 .is_none_or(|waiting| record.wait.is_some() == waiting)
1110 }
1111}
1112
1113#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1114#[serde(rename_all = "snake_case")]
1115pub enum ProcessListMode {
1116 #[default]
1117 Live,
1118 All,
1119}
1120
1121impl ProcessListMode {
1122 pub fn as_str(self) -> &'static str {
1123 match self {
1124 Self::Live => "live",
1125 Self::All => "all",
1126 }
1127 }
1128}
1129
1130#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1131pub struct ProcessStartGrant {
1132 pub session_scope: SessionScope,
1133 pub descriptor: ProcessHandleDescriptor,
1134}
1135
1136#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1137pub struct ProcessSessionDeleteReport {
1138 pub session_id: String,
1139 pub revoked_handle_count: usize,
1140 pub deleted_wake_count: usize,
1141 pub orphaned_process_ids: Vec<String>,
1142 pub preserved_process_ids: Vec<String>,
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147 use serde_json::json;
1148
1149 use super::*;
1150
1151 fn process_ref(component: &str, pos: usize) -> lashlang::ProcessRef {
1152 lashlang::ProcessRef {
1153 component: lashlang::ContentHash::new(component),
1154 pos: pos as u32,
1155 }
1156 }
1157
1158 fn process_value(
1159 module_ref: &lashlang::ModuleRef,
1160 surface_ref: &lashlang::RequiredSurfaceRef,
1161 process_ref: &lashlang::ProcessRef,
1162 name: &str,
1163 ) -> serde_json::Value {
1164 let mut value = serde_json::Map::new();
1165 value.insert(lashlang::LASH_PROCESS_VALUE_KEY.to_string(), json!(true));
1166 value.insert(lashlang::LASH_MODULE_REF_KEY.to_string(), json!(module_ref));
1167 value.insert(
1168 lashlang::LASH_REQUIRED_SURFACE_REF_KEY.to_string(),
1169 json!(surface_ref),
1170 );
1171 value.insert(
1172 lashlang::LASH_PROCESS_REF_KEY.to_string(),
1173 json!(process_ref),
1174 );
1175 value.insert(lashlang::LASH_PROCESS_NAME_KEY.to_string(), json!(name));
1176 serde_json::Value::Object(value)
1177 }
1178
1179 fn lashlang_entry(
1180 process_id: &str,
1181 module_ref: lashlang::ModuleRef,
1182 surface_ref: lashlang::RequiredSurfaceRef,
1183 process_ref: lashlang::ProcessRef,
1184 process_name: &str,
1185 status: ProcessStatus,
1186 ) -> ProcessHandleGrantEntry {
1187 let mut record = ProcessRecord::from_registration(
1188 ProcessRegistration::new(
1189 process_id,
1190 ProcessInput::LashlangProcess {
1191 module_ref,
1192 process_ref,
1193 required_surface_ref: surface_ref,
1194 process_name: process_name.to_string(),
1195 args: serde_json::Map::new(),
1196 },
1197 ProcessProvenance::host("model-test-host"),
1198 )
1199 .with_execution_env_ref(Some(ProcessExecutionEnvRef::new(format!(
1200 "process-env:test:{process_id}"
1201 )))),
1202 );
1203 record.status = status;
1204 (
1205 ProcessHandleGrant {
1206 session_id: "session".to_string(),
1207 process_id: process_id.to_string(),
1208 descriptor: ProcessHandleDescriptor::new(Some("lashlang"), Some(process_name)),
1209 },
1210 record,
1211 )
1212 }
1213
1214 #[test]
1215 fn process_list_filter_matches_definition_and_status() {
1216 let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
1217 let surface_ref = lashlang::RequiredSurfaceRef::new(&lashlang::ContentHash::new("surface"));
1218 let target_ref = process_ref("target", 0);
1219 let other_ref = process_ref("other", 1);
1220 let filter = ProcessListFilter::decode(&json!({
1221 "definition": process_value(&module_ref, &surface_ref, &target_ref, "target"),
1222 "status": "completed"
1223 }))
1224 .expect("decode filter");
1225
1226 let matching = lashlang_entry(
1227 "matching",
1228 module_ref.clone(),
1229 surface_ref.clone(),
1230 target_ref,
1231 "target",
1232 ProcessStatus::Completed {
1233 await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1234 json!(true),
1235 )),
1236 },
1237 );
1238 let wrong_definition = lashlang_entry(
1239 "wrong-definition",
1240 module_ref,
1241 surface_ref,
1242 other_ref,
1243 "other",
1244 ProcessStatus::Completed {
1245 await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1246 json!(true),
1247 )),
1248 },
1249 );
1250
1251 assert_eq!(filter.list_mode(), ProcessListMode::All);
1252 assert!(filter.matches_entry(&matching));
1253 assert!(!filter.matches_entry(&wrong_definition));
1254 }
1255
1256 #[test]
1257 fn process_list_filter_matches_waiting_facet() {
1258 let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
1259 let surface_ref = lashlang::RequiredSurfaceRef::new(&lashlang::ContentHash::new("surface"));
1260 let process_ref = process_ref("target", 0);
1261 let mut waiting_entry = lashlang_entry(
1262 "waiting",
1263 module_ref.clone(),
1264 surface_ref.clone(),
1265 process_ref.clone(),
1266 "target",
1267 ProcessStatus::Running,
1268 );
1269 waiting_entry.1.wait = Some(WaitState {
1270 since_ms: 42,
1271 kind: WaitKind::Signal {
1272 name: "ready".to_string(),
1273 event_type: "signal.ready".to_string(),
1274 key: "process:waiting:signal.ready:1".to_string(),
1275 ordinal: 1,
1276 },
1277 });
1278 let idle_entry = lashlang_entry(
1279 "idle",
1280 module_ref,
1281 surface_ref,
1282 process_ref,
1283 "target",
1284 ProcessStatus::Running,
1285 );
1286 let waiting_filter =
1287 ProcessListFilter::decode(&json!({ "waiting": true })).expect("decode waiting filter");
1288 let idle_filter =
1289 ProcessListFilter::decode(&json!({ "waiting": false })).expect("decode idle filter");
1290
1291 assert_eq!(waiting_filter.list_mode(), ProcessListMode::Live);
1292 assert!(waiting_filter.matches_entry(&waiting_entry));
1293 assert!(!waiting_filter.matches_entry(&idle_entry));
1294 assert!(!idle_filter.matches_entry(&waiting_entry));
1295 assert!(idle_filter.matches_entry(&idle_entry));
1296 assert!(
1297 ProcessListFilter::decode(&json!({ "waiting": "yes" }))
1298 .expect_err("invalid waiting filter")
1299 .contains("must be a boolean")
1300 );
1301 }
1302}