Skip to main content

lash_core/runtime/process/
model.rs

1use std::fmt;
2use std::sync::Arc;
3
4use serde::{Deserialize, Serialize};
5
6use super::events::{
7    ProcessAwaitOutput, ProcessEventType, ProcessTerminalSemantics, ProcessTerminalState,
8    default_process_event_types,
9};
10use super::time::current_epoch_ms;
11use super::validation::{
12    ensure_core_event_types, process_registration_hash, validate_process_registration,
13};
14
15pub type ProcessId = String;
16
17#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
18#[serde(transparent)]
19pub struct SessionScopeId(String);
20
21impl SessionScopeId {
22    pub fn new(value: impl Into<String>) -> Self {
23        Self(value.into())
24    }
25
26    pub fn as_str(&self) -> &str {
27        &self.0
28    }
29}
30
31impl fmt::Display for SessionScopeId {
32    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
33        formatter.write_str(&self.0)
34    }
35}
36
37impl From<String> for SessionScopeId {
38    fn from(value: String) -> Self {
39        Self::new(value)
40    }
41}
42
43impl From<&str> for SessionScopeId {
44    fn from(value: &str) -> Self {
45        Self::new(value)
46    }
47}
48
49/// Durable executable input for a process.
50#[derive(Debug, Serialize, Deserialize)]
51#[serde(tag = "type", rename_all = "snake_case")]
52pub enum ProcessInput {
53    ToolCall {
54        call: crate::PreparedToolCall,
55    },
56    LashlangProcess {
57        module_ref: lashlang::ModuleRef,
58        process_ref: lashlang::ProcessRef,
59        required_surface_ref: lashlang::RequiredSurfaceRef,
60        process_name: String,
61        #[serde(default)]
62        args: serde_json::Map<String, serde_json::Value>,
63    },
64    SessionTurn {
65        create_request: Box<crate::SessionCreateRequest>,
66        turn_input: Box<crate::TurnInput>,
67        output_contract: crate::ToolOutputContract,
68    },
69    External {
70        #[serde(default)]
71        metadata: serde_json::Value,
72    },
73}
74
75impl Clone for ProcessInput {
76    fn clone(&self) -> Self {
77        match self {
78            Self::ToolCall { call } => Self::ToolCall { call: call.clone() },
79            Self::LashlangProcess {
80                module_ref,
81                process_ref,
82                required_surface_ref,
83                process_name,
84                args,
85            } => Self::LashlangProcess {
86                module_ref: module_ref.clone(),
87                process_ref: process_ref.clone(),
88                required_surface_ref: required_surface_ref.clone(),
89                process_name: process_name.clone(),
90                args: args.clone(),
91            },
92            Self::SessionTurn {
93                create_request,
94                turn_input,
95                output_contract,
96            } => Self::SessionTurn {
97                create_request: create_request.clone(),
98                turn_input: turn_input.clone(),
99                output_contract: output_contract.clone(),
100            },
101            Self::External { metadata } => Self::External {
102                metadata: metadata.clone(),
103            },
104        }
105    }
106}
107
108#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
109#[serde(transparent)]
110pub struct ProcessExecutionEnvRef(String);
111
112impl ProcessExecutionEnvRef {
113    pub fn new(value: impl Into<String>) -> Self {
114        Self(value.into())
115    }
116
117    pub fn as_str(&self) -> &str {
118        &self.0
119    }
120}
121
122impl fmt::Display for ProcessExecutionEnvRef {
123    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
124        formatter.write_str(&self.0)
125    }
126}
127
128#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
129pub struct ProcessExecutionEnvSpec {
130    #[serde(default)]
131    pub plugin_options: crate::PluginOptions,
132    #[serde(default)]
133    pub policy: crate::SessionPolicy,
134}
135
136impl ProcessExecutionEnvSpec {
137    pub fn new(plugin_options: crate::PluginOptions, policy: crate::SessionPolicy) -> Self {
138        Self {
139            plugin_options,
140            policy,
141        }
142    }
143
144    pub fn stable_ref(&self) -> Result<ProcessExecutionEnvRef, serde_json::Error> {
145        crate::stable_hash::stable_json_sha256_hex(self)
146            .map(|hash| ProcessExecutionEnvRef::new(format!("process-env:sha256:{hash}")))
147    }
148
149    pub fn to_store_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
150        serde_json::to_vec(self)
151    }
152
153    pub fn from_store_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
154        serde_json::from_slice(bytes)
155    }
156}
157
158pub async fn persist_process_execution_env(
159    artifact_store: &dyn lashlang::LashlangArtifactStore,
160    spec: &ProcessExecutionEnvSpec,
161) -> Result<ProcessExecutionEnvRef, crate::PluginError> {
162    let env_ref = spec.stable_ref().map_err(|err| {
163        crate::PluginError::Session(format!("failed to hash process execution env: {err}"))
164    })?;
165    let bytes = spec.to_store_bytes().map_err(|err| {
166        crate::PluginError::Session(format!("failed to encode process execution env: {err}"))
167    })?;
168    artifact_store
169        .put_artifact_bytes(env_ref.as_str(), "process_execution_env", &bytes)
170        .await
171        .map_err(|err| {
172            crate::PluginError::Session(format!(
173                "failed to persist process execution env `{env_ref}`: {err}"
174            ))
175        })?;
176    Ok(env_ref)
177}
178
179pub async fn load_process_execution_env(
180    artifact_store: &dyn lashlang::LashlangArtifactStore,
181    env_ref: &ProcessExecutionEnvRef,
182) -> Result<ProcessExecutionEnvSpec, crate::PluginError> {
183    let bytes = artifact_store
184        .get_artifact_bytes(env_ref.as_str())
185        .await
186        .map_err(|err| {
187            crate::PluginError::Session(format!(
188                "failed to load process execution env `{env_ref}`: {err}"
189            ))
190        })?
191        .ok_or_else(|| {
192            crate::PluginError::Session(format!("missing process execution env `{env_ref}`"))
193        })?;
194    ProcessExecutionEnvSpec::from_store_bytes(&bytes).map_err(|err| {
195        crate::PluginError::Session(format!(
196            "failed to decode process execution env `{env_ref}`: {err}"
197        ))
198    })
199}
200
201/// Execution-local context for process runners. Durable edges live on the
202/// process record.
203#[derive(Clone, Debug, Default, Serialize, Deserialize)]
204pub struct ProcessExecutionContext {
205    #[serde(default, skip_serializing_if = "Option::is_none")]
206    pub causal_invocation: Option<crate::RuntimeInvocation>,
207}
208
209impl ProcessExecutionContext {
210    pub fn with_causal_invocation(mut self, invocation: Option<crate::RuntimeInvocation>) -> Self {
211        self.causal_invocation = invocation;
212        self
213    }
214
215    pub fn is_empty(&self) -> bool {
216        self.causal_invocation.is_none()
217    }
218}
219
220#[derive(Clone)]
221pub struct ProcessOpScope<'scope> {
222    pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
223    pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'scope>,
224    pub(crate) agent_frame_id: Option<crate::AgentFrameId>,
225    pub(crate) target_agent_frame_id: Option<crate::AgentFrameId>,
226}
227
228impl<'scope> ProcessOpScope<'scope> {
229    pub fn new(scoped_effect_controller: crate::ScopedEffectController<'scope>) -> Self {
230        Self {
231            parent_invocation: None,
232            effect_controller: crate::runtime::RuntimeEffectControllerHandle::borrowed(
233                scoped_effect_controller,
234            ),
235            agent_frame_id: None,
236            target_agent_frame_id: None,
237        }
238    }
239
240    pub fn with_parent_invocation(
241        mut self,
242        parent_invocation: Option<crate::RuntimeInvocation>,
243    ) -> Self {
244        self.parent_invocation = parent_invocation;
245        self
246    }
247
248    pub fn with_agent_frame_id(mut self, agent_frame_id: Option<crate::AgentFrameId>) -> Self {
249        self.agent_frame_id = agent_frame_id;
250        self
251    }
252
253    pub fn with_target_agent_frame_id(
254        mut self,
255        agent_frame_id: Option<crate::AgentFrameId>,
256    ) -> Self {
257        self.target_agent_frame_id = agent_frame_id;
258        self
259    }
260
261    pub fn agent_frame_id(&self) -> Option<&str> {
262        self.agent_frame_id.as_deref()
263    }
264
265    pub fn target_agent_frame_id(&self) -> Option<&str> {
266        self.target_agent_frame_id.as_deref()
267    }
268
269    pub(crate) fn controller(&self) -> &dyn crate::RuntimeEffectController {
270        self.effect_controller.controller()
271    }
272}
273
274#[derive(Clone, Debug, Default)]
275pub struct ProcessStartOptions {
276    pub descriptor: Option<ProcessHandleDescriptor>,
277}
278
279impl ProcessStartOptions {
280    pub fn new() -> Self {
281        Self::default()
282    }
283
284    pub fn with_descriptor(mut self, descriptor: ProcessHandleDescriptor) -> Self {
285        self.descriptor = Some(descriptor);
286        self
287    }
288
289    pub fn with_optional_descriptor(mut self, descriptor: Option<ProcessHandleDescriptor>) -> Self {
290        self.descriptor = descriptor;
291        self
292    }
293
294    pub fn execution_context(&self, scope: &ProcessOpScope<'_>) -> ProcessExecutionContext {
295        ProcessExecutionContext {
296            causal_invocation: scope.parent_invocation.clone(),
297        }
298    }
299}
300
301/// Public host-facing request for starting a visible process handle.
302#[derive(Clone, Debug, Serialize, Deserialize)]
303pub struct ProcessStartRequest {
304    pub id: ProcessId,
305    pub input: ProcessInput,
306    #[serde(default, skip_serializing_if = "Option::is_none")]
307    pub env_spec: Option<ProcessExecutionEnvSpec>,
308    pub originator: ProcessOriginator,
309    #[serde(default, skip_serializing_if = "Option::is_none")]
310    pub wake_target: Option<SessionScope>,
311    #[serde(default, skip_serializing_if = "Option::is_none")]
312    pub grant: Option<ProcessStartGrant>,
313    #[serde(default)]
314    pub event_types: Vec<ProcessEventType>,
315}
316
317impl ProcessStartRequest {
318    pub fn new(
319        id: impl Into<ProcessId>,
320        input: ProcessInput,
321        originator: ProcessOriginator,
322    ) -> Self {
323        Self {
324            id: id.into(),
325            input,
326            env_spec: None,
327            originator,
328            wake_target: None,
329            grant: None,
330            event_types: default_process_event_types(),
331        }
332    }
333
334    pub fn external(
335        id: impl Into<ProcessId>,
336        originator: ProcessOriginator,
337        metadata: serde_json::Value,
338    ) -> Self {
339        Self::new(id, ProcessInput::External { metadata }, originator)
340    }
341
342    pub fn with_env_spec(mut self, env_spec: ProcessExecutionEnvSpec) -> Self {
343        self.env_spec = Some(env_spec);
344        self
345    }
346
347    pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
348        self.wake_target = wake_target;
349        self
350    }
351
352    pub fn with_grant(mut self, grant: Option<ProcessStartGrant>) -> Self {
353        self.grant = grant;
354        self
355    }
356
357    pub fn with_event_types(
358        mut self,
359        event_types: impl IntoIterator<Item = ProcessEventType>,
360    ) -> Self {
361        self.event_types = event_types.into_iter().collect();
362        self
363    }
364
365    pub fn with_extra_event_types(
366        mut self,
367        event_types: impl IntoIterator<Item = ProcessEventType>,
368    ) -> Self {
369        self.event_types.extend(event_types);
370        self
371    }
372
373    pub fn into_registration(
374        self,
375        host_profile_id: impl Into<String>,
376        env_ref: Option<ProcessExecutionEnvRef>,
377    ) -> ProcessRegistration {
378        ProcessRegistration::new(
379            self.id,
380            self.input,
381            ProcessProvenance::new(self.originator, host_profile_id),
382        )
383        .with_event_types(self.event_types)
384        .with_execution_env_ref(env_ref)
385        .with_wake_target(self.wake_target)
386    }
387}
388
389#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
390pub struct SessionScope {
391    pub session_id: String,
392    #[serde(default, skip_serializing_if = "Option::is_none")]
393    pub agent_frame_id: Option<crate::AgentFrameId>,
394}
395
396#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
397pub struct ProcessProvenance {
398    pub originator: ProcessOriginator,
399    pub host_profile_id: String,
400    #[serde(default, skip_serializing_if = "Option::is_none")]
401    pub caused_by: Option<crate::CausalRef>,
402}
403
404impl ProcessProvenance {
405    pub fn new(originator: ProcessOriginator, host_profile_id: impl Into<String>) -> Self {
406        Self {
407            originator,
408            host_profile_id: host_profile_id.into(),
409            caused_by: None,
410        }
411    }
412
413    pub fn host(host_profile_id: impl Into<String>) -> Self {
414        Self::new(ProcessOriginator::host(), host_profile_id)
415    }
416
417    pub fn session(scope: SessionScope, host_profile_id: impl Into<String>) -> Self {
418        Self::new(ProcessOriginator::session(scope), host_profile_id)
419    }
420
421    pub fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
422        self.caused_by = caused_by;
423        self
424    }
425}
426
427#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
428#[serde(tag = "type", rename_all = "snake_case")]
429pub enum ProcessOriginator {
430    Host,
431    Session { scope: SessionScope },
432}
433
434impl ProcessOriginator {
435    pub fn host() -> Self {
436        Self::Host
437    }
438
439    pub fn session(scope: SessionScope) -> Self {
440        Self::Session { scope }
441    }
442
443    pub fn scope_id(&self) -> String {
444        match self {
445            Self::Host => "host".to_string(),
446            Self::Session { scope } => scope.id().to_string(),
447        }
448    }
449}
450
451impl SessionScope {
452    pub fn new(session_id: impl Into<String>) -> Self {
453        Self {
454            session_id: session_id.into(),
455            agent_frame_id: None,
456        }
457    }
458
459    pub fn for_agent_frame(
460        session_id: impl Into<String>,
461        agent_frame_id: impl Into<crate::AgentFrameId>,
462    ) -> Self {
463        Self {
464            session_id: session_id.into(),
465            agent_frame_id: Some(agent_frame_id.into()),
466        }
467    }
468
469    pub fn id(&self) -> SessionScopeId {
470        match self.agent_frame_id.as_deref() {
471            Some(frame_id) if !frame_id.is_empty() => {
472                SessionScopeId::new(format!("session:{}/frame:{frame_id}", self.session_id))
473            }
474            _ => SessionScopeId::new(format!("session:{}", self.session_id)),
475        }
476    }
477
478    pub fn is_empty(&self) -> bool {
479        self.session_id.is_empty()
480    }
481}
482
483/// Serializable process spec used to start or recover a runtime process.
484#[derive(Debug, Serialize, Deserialize)]
485pub struct ProcessRegistration {
486    pub id: ProcessId,
487    pub input: Arc<ProcessInput>,
488    #[serde(default)]
489    pub event_types: Vec<ProcessEventType>,
490    pub provenance: ProcessProvenance,
491    #[serde(default, skip_serializing_if = "Option::is_none")]
492    pub env_ref: Option<ProcessExecutionEnvRef>,
493    #[serde(default, skip_serializing_if = "Option::is_none")]
494    pub wake_target: Option<SessionScope>,
495}
496
497impl Clone for ProcessRegistration {
498    fn clone(&self) -> Self {
499        Self {
500            id: self.id.clone(),
501            input: Arc::clone(&self.input),
502            event_types: self.event_types.clone(),
503            provenance: self.provenance.clone(),
504            env_ref: self.env_ref.clone(),
505            wake_target: self.wake_target.clone(),
506        }
507    }
508}
509
510impl ProcessRegistration {
511    pub fn new(
512        id: impl Into<ProcessId>,
513        input: ProcessInput,
514        provenance: ProcessProvenance,
515    ) -> Self {
516        Self {
517            id: id.into(),
518            input: Arc::new(input),
519            event_types: default_process_event_types(),
520            provenance,
521            env_ref: None,
522            wake_target: None,
523        }
524    }
525
526    pub(crate) fn session_start_draft(id: impl Into<ProcessId>, input: ProcessInput) -> Self {
527        Self::new(id, input, ProcessProvenance::host("session-start-draft"))
528    }
529
530    pub fn with_process_provenance(mut self, provenance: ProcessProvenance) -> Self {
531        self.provenance = provenance;
532        self
533    }
534
535    pub fn with_execution_env_ref(mut self, env_ref: Option<ProcessExecutionEnvRef>) -> Self {
536        self.env_ref = env_ref;
537        self
538    }
539
540    pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
541        self.wake_target = wake_target;
542        self
543    }
544
545    pub fn with_event_types(
546        mut self,
547        event_types: impl IntoIterator<Item = ProcessEventType>,
548    ) -> Self {
549        self.event_types = event_types.into_iter().collect();
550        self
551    }
552
553    pub fn with_extra_event_types(
554        mut self,
555        event_types: impl IntoIterator<Item = ProcessEventType>,
556    ) -> Self {
557        self.event_types.extend(event_types);
558        self
559    }
560}
561
562#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
563#[serde(tag = "state", rename_all = "snake_case")]
564pub enum ProcessStatus {
565    #[default]
566    Running,
567    Completed {
568        await_output: ProcessAwaitOutput,
569    },
570    Failed {
571        await_output: ProcessAwaitOutput,
572    },
573    Cancelled {
574        await_output: ProcessAwaitOutput,
575    },
576}
577
578impl ProcessStatus {
579    pub fn from_terminal(terminal: ProcessTerminalSemantics) -> Self {
580        match terminal.state {
581            ProcessTerminalState::Completed => Self::Completed {
582                await_output: terminal.await_output,
583            },
584            ProcessTerminalState::Failed => Self::Failed {
585                await_output: terminal.await_output,
586            },
587            ProcessTerminalState::Cancelled => Self::Cancelled {
588                await_output: terminal.await_output,
589            },
590        }
591    }
592
593    pub fn is_terminal(&self) -> bool {
594        !matches!(self, Self::Running)
595    }
596
597    pub fn label(&self) -> &'static str {
598        match self {
599            Self::Running => "running",
600            Self::Completed { .. } => "completed",
601            Self::Failed { .. } => "failed",
602            Self::Cancelled { .. } => "cancelled",
603        }
604    }
605
606    pub fn terminal_state(&self) -> Option<ProcessTerminalState> {
607        match self {
608            Self::Running => None,
609            Self::Completed { .. } => Some(ProcessTerminalState::Completed),
610            Self::Failed { .. } => Some(ProcessTerminalState::Failed),
611            Self::Cancelled { .. } => Some(ProcessTerminalState::Cancelled),
612        }
613    }
614
615    pub fn await_output(&self) -> Option<&ProcessAwaitOutput> {
616        match self {
617            Self::Running => None,
618            Self::Completed { await_output }
619            | Self::Failed { await_output }
620            | Self::Cancelled { await_output } => Some(await_output),
621        }
622    }
623
624    pub fn terminal_semantics(&self) -> Option<ProcessTerminalSemantics> {
625        Some(ProcessTerminalSemantics {
626            state: self.terminal_state()?,
627            await_output: self.await_output()?.clone(),
628        })
629    }
630}
631
632/// Durable process row. Session-visible addressability lives in
633/// [`ProcessHandleGrant`], not in the process record.
634#[derive(Clone, Debug, Serialize, Deserialize)]
635pub struct ProcessRecord {
636    pub id: ProcessId,
637    pub registration_hash: String,
638    pub input: Arc<ProcessInput>,
639    #[serde(default)]
640    pub event_types: Vec<ProcessEventType>,
641    pub provenance: ProcessProvenance,
642    #[serde(default, skip_serializing_if = "Option::is_none")]
643    pub env_ref: Option<ProcessExecutionEnvRef>,
644    #[serde(default, skip_serializing_if = "Option::is_none")]
645    pub wake_target: Option<SessionScope>,
646    #[serde(default)]
647    pub created_at_ms: u64,
648    #[serde(default)]
649    pub updated_at_ms: u64,
650    #[serde(default, skip_serializing_if = "Option::is_none")]
651    pub external_ref: Option<ProcessExternalRef>,
652    #[serde(default, skip_serializing_if = "Option::is_none")]
653    pub wait: Option<WaitState>,
654    #[serde(default)]
655    pub status: ProcessStatus,
656}
657
658#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
659pub struct WaitState {
660    pub kind: WaitKind,
661    pub since_ms: u64,
662}
663
664#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
665#[serde(tag = "kind", rename_all = "snake_case")]
666pub enum WaitKind {
667    Signal {
668        name: String,
669        event_type: String,
670        key: String,
671        ordinal: u64,
672    },
673}
674
675impl ProcessRecord {
676    pub fn from_registration(mut registration: ProcessRegistration) -> Self {
677        ensure_core_event_types(&mut registration);
678        validate_process_registration(&registration)
679            .expect("process registration should be valid before record construction");
680        let registration_hash = process_registration_hash(&registration)
681            .expect("process registration should hash before record construction");
682        Self::from_prepared_registration(registration, registration_hash, current_epoch_ms())
683    }
684
685    pub fn from_prepared_registration(
686        registration: ProcessRegistration,
687        registration_hash: String,
688        now_ms: u64,
689    ) -> Self {
690        Self {
691            id: registration.id,
692            registration_hash,
693            input: registration.input,
694            event_types: registration.event_types,
695            provenance: registration.provenance,
696            env_ref: registration.env_ref,
697            wake_target: registration.wake_target,
698            created_at_ms: now_ms,
699            updated_at_ms: now_ms,
700            external_ref: None,
701            wait: None,
702            status: ProcessStatus::Running,
703        }
704    }
705
706    pub fn is_terminal(&self) -> bool {
707        self.status.is_terminal()
708    }
709
710    pub fn originator_scope_id(&self) -> String {
711        self.provenance.originator.scope_id()
712    }
713
714    pub fn host_profile_id(&self) -> &str {
715        &self.provenance.host_profile_id
716    }
717}
718
719/// Wire-format version stamped on every persisted [`ProcessLease`].
720///
721/// Bump when the on-wire shape of `ProcessLease` changes in a way that older
722/// code cannot safely deserialize.
723pub const PROCESS_LEASE_SCHEMA_VERSION: u32 = 1;
724
725/// Durable lease over a non-terminal background process.
726///
727/// The lease pair `(owner_id, lease_token)` plus `fencing_token` are how lash guarantees that
728/// one non-terminal process is re-executed by exactly one worker at a time —
729/// even after a crash, even across two workers that both sweep the same
730/// registry for recoverable work. The durable backend
731/// (`lash-sqlite-store`) uses these to serialize concurrent claims on the same
732/// `process_id`; future distributed durable backends use the *same* fields to
733/// coordinate workers that don't share a file system.
734///
735/// **This is not single-process theatre.** The owner / fencing-token /
736/// lease-token triple is the public contract that lets any backend detect and
737/// reject stale writers. Treat it as load-bearing, not defensive.
738#[derive(Clone, Debug, Serialize, Deserialize)]
739pub struct ProcessLease {
740    pub schema_version: u32,
741    pub process_id: ProcessId,
742    pub owner_id: String,
743    pub lease_token: String,
744    pub fencing_token: u64,
745    pub claimed_at_epoch_ms: u64,
746    pub expires_at_epoch_ms: u64,
747}
748
749#[derive(Clone, Debug, Serialize, Deserialize)]
750pub struct ProcessLeaseCompletion {
751    pub process_id: ProcessId,
752    pub lease_token: String,
753}
754
755impl ProcessLeaseCompletion {
756    pub fn from_lease(lease: &ProcessLease) -> Self {
757        Self {
758            process_id: lease.process_id.clone(),
759            lease_token: lease.lease_token.clone(),
760        }
761    }
762}
763
764/// Durable backend reference for background work accepted outside the local process.
765#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
766pub struct ProcessExternalRef {
767    pub backend: String,
768    pub id: String,
769    #[serde(default, skip_serializing_if = "Option::is_none")]
770    pub metadata: Option<serde_json::Value>,
771}
772
773#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
774pub struct ProcessHandleDescriptor {
775    #[serde(default, skip_serializing_if = "Option::is_none")]
776    pub kind: Option<String>,
777    #[serde(default, skip_serializing_if = "Option::is_none")]
778    pub label: Option<String>,
779}
780
781impl ProcessHandleDescriptor {
782    pub fn new(kind: Option<impl Into<String>>, label: Option<impl Into<String>>) -> Self {
783        Self {
784            kind: kind.map(Into::into),
785            label: label.map(Into::into),
786        }
787    }
788}
789
790#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
791pub struct ProcessHandleGrant {
792    pub session_id: String,
793    pub process_id: ProcessId,
794    pub descriptor: ProcessHandleDescriptor,
795}
796
797pub type ProcessHandleGrantEntry = (ProcessHandleGrant, ProcessRecord);
798
799#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
800#[serde(rename_all = "snake_case")]
801pub enum ProcessLifecycleStatus {
802    #[default]
803    Running,
804    Completed,
805    Failed,
806    Cancelled,
807}
808
809impl ProcessLifecycleStatus {
810    pub fn label(self) -> &'static str {
811        match self {
812            Self::Running => "running",
813            Self::Completed => "completed",
814            Self::Failed => "failed",
815            Self::Cancelled => "cancelled",
816        }
817    }
818
819    pub fn is_terminal(self) -> bool {
820        !matches!(self, Self::Running)
821    }
822
823    pub fn terminal_state(self) -> Option<ProcessTerminalState> {
824        match self {
825            Self::Running => None,
826            Self::Completed => Some(ProcessTerminalState::Completed),
827            Self::Failed => Some(ProcessTerminalState::Failed),
828            Self::Cancelled => Some(ProcessTerminalState::Cancelled),
829        }
830    }
831}
832
833impl From<&ProcessStatus> for ProcessLifecycleStatus {
834    fn from(status: &ProcessStatus) -> Self {
835        match status {
836            ProcessStatus::Running => Self::Running,
837            ProcessStatus::Completed { .. } => Self::Completed,
838            ProcessStatus::Failed { .. } => Self::Failed,
839            ProcessStatus::Cancelled { .. } => Self::Cancelled,
840        }
841    }
842}
843
844impl From<ProcessStatus> for ProcessLifecycleStatus {
845    fn from(status: ProcessStatus) -> Self {
846        Self::from(&status)
847    }
848}
849
850#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
851pub struct ProcessHandleSummary {
852    #[serde(rename = "__handle__")]
853    pub handle_type: String,
854    pub id: ProcessId,
855    pub process_id: ProcessId,
856    pub descriptor: ProcessHandleDescriptor,
857    #[serde(default, skip_serializing_if = "Option::is_none")]
858    pub definition: Option<ProcessDefinitionSummary>,
859    pub status: ProcessLifecycleStatus,
860}
861
862impl ProcessHandleSummary {
863    pub fn new(
864        process_id: impl Into<ProcessId>,
865        descriptor: ProcessHandleDescriptor,
866        status: ProcessLifecycleStatus,
867    ) -> Self {
868        let process_id = process_id.into();
869        Self {
870            handle_type: "process".to_string(),
871            id: process_id.clone(),
872            process_id,
873            descriptor,
874            definition: None,
875            status,
876        }
877    }
878
879    pub fn with_definition(mut self, definition: Option<ProcessDefinitionSummary>) -> Self {
880        self.definition = definition;
881        self
882    }
883
884    pub fn from_grant_record(grant: ProcessHandleGrant, record: ProcessRecord) -> Self {
885        let definition = ProcessDefinitionSummary::from_input(record.input.as_ref());
886        Self::new(
887            record.id,
888            grant.descriptor,
889            ProcessLifecycleStatus::from(record.status),
890        )
891        .with_definition(definition)
892    }
893}
894
895impl From<ProcessHandleGrantEntry> for ProcessHandleSummary {
896    fn from((grant, record): ProcessHandleGrantEntry) -> Self {
897        Self::from_grant_record(grant, record)
898    }
899}
900
901#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
902pub struct ProcessCancelSummary {
903    pub process_id: ProcessId,
904    pub status: ProcessLifecycleStatus,
905}
906
907impl ProcessCancelSummary {
908    pub fn from_record(record: ProcessRecord) -> Self {
909        Self {
910            process_id: record.id,
911            status: ProcessLifecycleStatus::from(record.status),
912        }
913    }
914}
915
916#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
917pub struct ProcessDefinitionSummary {
918    pub name: String,
919}
920
921impl ProcessDefinitionSummary {
922    pub fn from_input(input: &ProcessInput) -> Option<Self> {
923        match input {
924            ProcessInput::LashlangProcess { process_name, .. } => Some(Self {
925                name: process_name.clone(),
926            }),
927            ProcessInput::ToolCall { .. }
928            | ProcessInput::SessionTurn { .. }
929            | ProcessInput::External { .. } => None,
930        }
931    }
932}
933
934#[derive(Clone, Debug, PartialEq, Eq)]
935pub struct ProcessDefinitionSelector {
936    module_ref: lashlang::ModuleRef,
937    required_surface_ref: lashlang::RequiredSurfaceRef,
938    process_ref: lashlang::ProcessRef,
939    process_name: String,
940}
941
942impl ProcessDefinitionSelector {
943    pub fn decode(value: &serde_json::Value) -> Result<Self, String> {
944        if value
945            .get(lashlang::LASH_PROCESS_VALUE_KEY)
946            .and_then(serde_json::Value::as_bool)
947            != Some(true)
948        {
949            return Err("definition must be a process definition value".to_string());
950        }
951        Ok(Self {
952            module_ref: decode_process_definition_field(
953                value,
954                lashlang::LASH_MODULE_REF_KEY,
955                "definition",
956            )?,
957            required_surface_ref: decode_process_definition_field(
958                value,
959                lashlang::LASH_REQUIRED_SURFACE_REF_KEY,
960                "definition",
961            )?,
962            process_ref: decode_process_definition_field(
963                value,
964                lashlang::LASH_PROCESS_REF_KEY,
965                "definition",
966            )?,
967            process_name: value
968                .get(lashlang::LASH_PROCESS_NAME_KEY)
969                .and_then(serde_json::Value::as_str)
970                .ok_or_else(|| "definition is missing its process name".to_string())?
971                .to_string(),
972        })
973    }
974
975    pub fn matches_input(&self, input: &ProcessInput) -> bool {
976        match input {
977            ProcessInput::LashlangProcess {
978                module_ref,
979                process_ref,
980                required_surface_ref,
981                process_name,
982                ..
983            } => {
984                self.module_ref == *module_ref
985                    && self.required_surface_ref == *required_surface_ref
986                    && self.process_ref == *process_ref
987                    && self.process_name == *process_name
988            }
989            ProcessInput::ToolCall { .. }
990            | ProcessInput::SessionTurn { .. }
991            | ProcessInput::External { .. } => false,
992        }
993    }
994}
995
996fn decode_process_definition_field<T: serde::de::DeserializeOwned>(
997    value: &serde_json::Value,
998    field: &'static str,
999    label: &'static str,
1000) -> Result<T, String> {
1001    serde_json::from_value(
1002        value
1003            .get(field)
1004            .cloned()
1005            .ok_or_else(|| format!("{label} is missing {field}"))?,
1006    )
1007    .map_err(|err| format!("{label} has invalid {field}: {err}"))
1008}
1009
1010#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
1011pub enum ProcessStatusFilter {
1012    #[default]
1013    Running,
1014    Completed,
1015    Failed,
1016    Cancelled,
1017    Any,
1018}
1019
1020impl ProcessStatusFilter {
1021    pub fn decode(value: Option<&str>) -> Result<Self, String> {
1022        match value.unwrap_or("running") {
1023            "running" => Ok(Self::Running),
1024            "completed" => Ok(Self::Completed),
1025            "failed" => Ok(Self::Failed),
1026            "cancelled" => Ok(Self::Cancelled),
1027            "any" => Ok(Self::Any),
1028            other => Err(format!(
1029                "processes.list status must be `running`, `completed`, `failed`, `cancelled`, or `any`, got `{other}`"
1030            )),
1031        }
1032    }
1033
1034    pub fn list_mode(self) -> ProcessListMode {
1035        match self {
1036            Self::Running => ProcessListMode::Live,
1037            Self::Completed | Self::Failed | Self::Cancelled | Self::Any => ProcessListMode::All,
1038        }
1039    }
1040
1041    pub fn matches(self, status: ProcessLifecycleStatus) -> bool {
1042        match self {
1043            Self::Running => status == ProcessLifecycleStatus::Running,
1044            Self::Completed => status == ProcessLifecycleStatus::Completed,
1045            Self::Failed => status == ProcessLifecycleStatus::Failed,
1046            Self::Cancelled => status == ProcessLifecycleStatus::Cancelled,
1047            Self::Any => true,
1048        }
1049    }
1050}
1051
1052#[derive(Clone, Debug, Default, PartialEq, Eq)]
1053pub struct ProcessListFilter {
1054    pub definition: Option<ProcessDefinitionSelector>,
1055    pub status: ProcessStatusFilter,
1056    pub waiting: Option<bool>,
1057}
1058
1059impl ProcessListFilter {
1060    pub fn decode(args: &serde_json::Value) -> Result<Self, String> {
1061        let map = args
1062            .as_object()
1063            .ok_or_else(|| "processes.list expects a record of process filters".to_string())?;
1064        for key in map.keys() {
1065            match key.as_str() {
1066                "definition" | "status" | "waiting" => {}
1067                _ => return Err(format!("processes.list unknown filter `{key}`")),
1068            }
1069        }
1070        let definition = args
1071            .get("definition")
1072            .map(ProcessDefinitionSelector::decode)
1073            .transpose()?;
1074        let status =
1075            ProcessStatusFilter::decode(args.get("status").and_then(serde_json::Value::as_str))?;
1076        let waiting = args
1077            .get("waiting")
1078            .map(|value| {
1079                value
1080                    .as_bool()
1081                    .ok_or_else(|| "processes.list `waiting` filter must be a boolean".to_string())
1082            })
1083            .transpose()?;
1084        Ok(Self {
1085            definition,
1086            status,
1087            waiting,
1088        })
1089    }
1090
1091    pub fn list_mode(&self) -> ProcessListMode {
1092        self.status.list_mode()
1093    }
1094
1095    pub fn matches_entry(&self, entry: &ProcessHandleGrantEntry) -> bool {
1096        let (_grant, record) = entry;
1097        self.matches_record(record)
1098    }
1099
1100    pub fn matches_record(&self, record: &ProcessRecord) -> bool {
1101        let status = ProcessLifecycleStatus::from(&record.status);
1102        self.status.matches(status)
1103            && self
1104                .definition
1105                .as_ref()
1106                .is_none_or(|definition| definition.matches_input(record.input.as_ref()))
1107            && self
1108                .waiting
1109                .is_none_or(|waiting| record.wait.is_some() == waiting)
1110    }
1111}
1112
1113#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1114#[serde(rename_all = "snake_case")]
1115pub enum ProcessListMode {
1116    #[default]
1117    Live,
1118    All,
1119}
1120
1121impl ProcessListMode {
1122    pub fn as_str(self) -> &'static str {
1123        match self {
1124            Self::Live => "live",
1125            Self::All => "all",
1126        }
1127    }
1128}
1129
1130#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1131pub struct ProcessStartGrant {
1132    pub session_scope: SessionScope,
1133    pub descriptor: ProcessHandleDescriptor,
1134}
1135
1136#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1137pub struct ProcessSessionDeleteReport {
1138    pub session_id: String,
1139    pub revoked_handle_count: usize,
1140    pub deleted_wake_count: usize,
1141    pub orphaned_process_ids: Vec<String>,
1142    pub preserved_process_ids: Vec<String>,
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147    use serde_json::json;
1148
1149    use super::*;
1150
1151    fn process_ref(component: &str, pos: usize) -> lashlang::ProcessRef {
1152        lashlang::ProcessRef {
1153            component: lashlang::ContentHash::new(component),
1154            pos: pos as u32,
1155        }
1156    }
1157
1158    fn process_value(
1159        module_ref: &lashlang::ModuleRef,
1160        surface_ref: &lashlang::RequiredSurfaceRef,
1161        process_ref: &lashlang::ProcessRef,
1162        name: &str,
1163    ) -> serde_json::Value {
1164        let mut value = serde_json::Map::new();
1165        value.insert(lashlang::LASH_PROCESS_VALUE_KEY.to_string(), json!(true));
1166        value.insert(lashlang::LASH_MODULE_REF_KEY.to_string(), json!(module_ref));
1167        value.insert(
1168            lashlang::LASH_REQUIRED_SURFACE_REF_KEY.to_string(),
1169            json!(surface_ref),
1170        );
1171        value.insert(
1172            lashlang::LASH_PROCESS_REF_KEY.to_string(),
1173            json!(process_ref),
1174        );
1175        value.insert(lashlang::LASH_PROCESS_NAME_KEY.to_string(), json!(name));
1176        serde_json::Value::Object(value)
1177    }
1178
1179    fn lashlang_entry(
1180        process_id: &str,
1181        module_ref: lashlang::ModuleRef,
1182        surface_ref: lashlang::RequiredSurfaceRef,
1183        process_ref: lashlang::ProcessRef,
1184        process_name: &str,
1185        status: ProcessStatus,
1186    ) -> ProcessHandleGrantEntry {
1187        let mut record = ProcessRecord::from_registration(
1188            ProcessRegistration::new(
1189                process_id,
1190                ProcessInput::LashlangProcess {
1191                    module_ref,
1192                    process_ref,
1193                    required_surface_ref: surface_ref,
1194                    process_name: process_name.to_string(),
1195                    args: serde_json::Map::new(),
1196                },
1197                ProcessProvenance::host("model-test-host"),
1198            )
1199            .with_execution_env_ref(Some(ProcessExecutionEnvRef::new(format!(
1200                "process-env:test:{process_id}"
1201            )))),
1202        );
1203        record.status = status;
1204        (
1205            ProcessHandleGrant {
1206                session_id: "session".to_string(),
1207                process_id: process_id.to_string(),
1208                descriptor: ProcessHandleDescriptor::new(Some("lashlang"), Some(process_name)),
1209            },
1210            record,
1211        )
1212    }
1213
1214    #[test]
1215    fn process_list_filter_matches_definition_and_status() {
1216        let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
1217        let surface_ref = lashlang::RequiredSurfaceRef::new(&lashlang::ContentHash::new("surface"));
1218        let target_ref = process_ref("target", 0);
1219        let other_ref = process_ref("other", 1);
1220        let filter = ProcessListFilter::decode(&json!({
1221            "definition": process_value(&module_ref, &surface_ref, &target_ref, "target"),
1222            "status": "completed"
1223        }))
1224        .expect("decode filter");
1225
1226        let matching = lashlang_entry(
1227            "matching",
1228            module_ref.clone(),
1229            surface_ref.clone(),
1230            target_ref,
1231            "target",
1232            ProcessStatus::Completed {
1233                await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1234                    json!(true),
1235                )),
1236            },
1237        );
1238        let wrong_definition = lashlang_entry(
1239            "wrong-definition",
1240            module_ref,
1241            surface_ref,
1242            other_ref,
1243            "other",
1244            ProcessStatus::Completed {
1245                await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1246                    json!(true),
1247                )),
1248            },
1249        );
1250
1251        assert_eq!(filter.list_mode(), ProcessListMode::All);
1252        assert!(filter.matches_entry(&matching));
1253        assert!(!filter.matches_entry(&wrong_definition));
1254    }
1255
1256    #[test]
1257    fn process_list_filter_matches_waiting_facet() {
1258        let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
1259        let surface_ref = lashlang::RequiredSurfaceRef::new(&lashlang::ContentHash::new("surface"));
1260        let process_ref = process_ref("target", 0);
1261        let mut waiting_entry = lashlang_entry(
1262            "waiting",
1263            module_ref.clone(),
1264            surface_ref.clone(),
1265            process_ref.clone(),
1266            "target",
1267            ProcessStatus::Running,
1268        );
1269        waiting_entry.1.wait = Some(WaitState {
1270            since_ms: 42,
1271            kind: WaitKind::Signal {
1272                name: "ready".to_string(),
1273                event_type: "signal.ready".to_string(),
1274                key: "process:waiting:signal.ready:1".to_string(),
1275                ordinal: 1,
1276            },
1277        });
1278        let idle_entry = lashlang_entry(
1279            "idle",
1280            module_ref,
1281            surface_ref,
1282            process_ref,
1283            "target",
1284            ProcessStatus::Running,
1285        );
1286        let waiting_filter =
1287            ProcessListFilter::decode(&json!({ "waiting": true })).expect("decode waiting filter");
1288        let idle_filter =
1289            ProcessListFilter::decode(&json!({ "waiting": false })).expect("decode idle filter");
1290
1291        assert_eq!(waiting_filter.list_mode(), ProcessListMode::Live);
1292        assert!(waiting_filter.matches_entry(&waiting_entry));
1293        assert!(!waiting_filter.matches_entry(&idle_entry));
1294        assert!(!idle_filter.matches_entry(&waiting_entry));
1295        assert!(idle_filter.matches_entry(&idle_entry));
1296        assert!(
1297            ProcessListFilter::decode(&json!({ "waiting": "yes" }))
1298                .expect_err("invalid waiting filter")
1299                .contains("must be a boolean")
1300        );
1301    }
1302}