Skip to main content

lash_core/runtime/process/
model.rs

1use std::fmt;
2use std::sync::Arc;
3
4use serde::{Deserialize, Serialize};
5
6use super::events::{
7    ProcessAwaitOutput, ProcessEventType, ProcessTerminalSemantics, ProcessTerminalState,
8    default_process_event_types,
9};
10use super::time::current_epoch_ms;
11use super::validation::{
12    ensure_core_event_types, process_registration_hash, validate_process_registration,
13};
14
15pub type ProcessId = String;
16
17#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
18#[serde(transparent)]
19pub struct SessionScopeId(String);
20
21impl SessionScopeId {
22    pub fn new(value: impl Into<String>) -> Self {
23        Self(value.into())
24    }
25
26    pub fn as_str(&self) -> &str {
27        &self.0
28    }
29}
30
31impl fmt::Display for SessionScopeId {
32    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
33        formatter.write_str(&self.0)
34    }
35}
36
37impl From<String> for SessionScopeId {
38    fn from(value: String) -> Self {
39        Self::new(value)
40    }
41}
42
43impl From<&str> for SessionScopeId {
44    fn from(value: &str) -> Self {
45        Self::new(value)
46    }
47}
48
49/// Durable executable input for a process.
50#[derive(Debug, Serialize, Deserialize)]
51#[serde(tag = "type", rename_all = "snake_case")]
52pub enum ProcessInput {
53    ToolCall {
54        call: crate::PreparedToolCall,
55    },
56    LashlangProcess {
57        module_ref: lashlang::ModuleRef,
58        process_ref: lashlang::ProcessRef,
59        required_surface_ref: lashlang::RequiredSurfaceRef,
60        process_name: String,
61        #[serde(default)]
62        args: serde_json::Map<String, serde_json::Value>,
63    },
64    SessionTurn {
65        create_request: Box<crate::SessionCreateRequest>,
66        turn_input: Box<crate::TurnInput>,
67        output_contract: crate::ToolOutputContract,
68    },
69    External {
70        #[serde(default)]
71        metadata: serde_json::Value,
72    },
73}
74
75impl Clone for ProcessInput {
76    fn clone(&self) -> Self {
77        match self {
78            Self::ToolCall { call } => Self::ToolCall { call: call.clone() },
79            Self::LashlangProcess {
80                module_ref,
81                process_ref,
82                required_surface_ref,
83                process_name,
84                args,
85            } => Self::LashlangProcess {
86                module_ref: module_ref.clone(),
87                process_ref: process_ref.clone(),
88                required_surface_ref: required_surface_ref.clone(),
89                process_name: process_name.clone(),
90                args: args.clone(),
91            },
92            Self::SessionTurn {
93                create_request,
94                turn_input,
95                output_contract,
96            } => Self::SessionTurn {
97                create_request: create_request.clone(),
98                turn_input: turn_input.clone(),
99                output_contract: output_contract.clone(),
100            },
101            Self::External { metadata } => Self::External {
102                metadata: metadata.clone(),
103            },
104        }
105    }
106}
107
108#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
109#[serde(transparent)]
110pub struct ProcessExecutionEnvRef(String);
111
112impl ProcessExecutionEnvRef {
113    pub fn new(value: impl Into<String>) -> Self {
114        Self(value.into())
115    }
116
117    pub fn as_str(&self) -> &str {
118        &self.0
119    }
120}
121
122impl fmt::Display for ProcessExecutionEnvRef {
123    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
124        formatter.write_str(&self.0)
125    }
126}
127
128#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
129pub struct ProcessExecutionEnvSpec {
130    #[serde(default)]
131    pub plugin_options: crate::PluginOptions,
132    #[serde(default)]
133    pub policy: crate::SessionPolicy,
134}
135
136impl ProcessExecutionEnvSpec {
137    pub fn new(plugin_options: crate::PluginOptions, policy: crate::SessionPolicy) -> Self {
138        Self {
139            plugin_options,
140            policy,
141        }
142    }
143
144    pub fn stable_ref(&self) -> Result<ProcessExecutionEnvRef, serde_json::Error> {
145        crate::stable_hash::stable_json_sha256_hex(self)
146            .map(|hash| ProcessExecutionEnvRef::new(format!("process-env:sha256:{hash}")))
147    }
148
149    pub fn to_store_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
150        serde_json::to_vec(self)
151    }
152
153    pub fn from_store_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
154        serde_json::from_slice(bytes)
155    }
156}
157
158pub async fn persist_process_execution_env(
159    artifact_store: &dyn lashlang::LashlangArtifactStore,
160    spec: &ProcessExecutionEnvSpec,
161) -> Result<ProcessExecutionEnvRef, crate::PluginError> {
162    let env_ref = spec.stable_ref().map_err(|err| {
163        crate::PluginError::Session(format!("failed to hash process execution env: {err}"))
164    })?;
165    let bytes = spec.to_store_bytes().map_err(|err| {
166        crate::PluginError::Session(format!("failed to encode process execution env: {err}"))
167    })?;
168    artifact_store
169        .put_artifact_bytes(env_ref.as_str(), "process_execution_env", &bytes)
170        .await
171        .map_err(|err| {
172            crate::PluginError::Session(format!(
173                "failed to persist process execution env `{env_ref}`: {err}"
174            ))
175        })?;
176    Ok(env_ref)
177}
178
179pub async fn load_process_execution_env(
180    artifact_store: &dyn lashlang::LashlangArtifactStore,
181    env_ref: &ProcessExecutionEnvRef,
182) -> Result<ProcessExecutionEnvSpec, crate::PluginError> {
183    let bytes = artifact_store
184        .get_artifact_bytes(env_ref.as_str())
185        .await
186        .map_err(|err| {
187            crate::PluginError::Session(format!(
188                "failed to load process execution env `{env_ref}`: {err}"
189            ))
190        })?
191        .ok_or_else(|| {
192            crate::PluginError::Session(format!("missing process execution env `{env_ref}`"))
193        })?;
194    ProcessExecutionEnvSpec::from_store_bytes(&bytes).map_err(|err| {
195        crate::PluginError::Session(format!(
196            "failed to decode process execution env `{env_ref}`: {err}"
197        ))
198    })
199}
200
201/// Execution-local context for process runners. Durable edges live on the
202/// process record.
203#[derive(Clone, Debug, Default, Serialize, Deserialize)]
204pub struct ProcessExecutionContext {
205    #[serde(default, skip_serializing_if = "Option::is_none")]
206    pub causal_invocation: Option<crate::RuntimeInvocation>,
207}
208
209impl ProcessExecutionContext {
210    pub fn with_causal_invocation(mut self, invocation: Option<crate::RuntimeInvocation>) -> Self {
211        self.causal_invocation = invocation;
212        self
213    }
214
215    pub fn is_empty(&self) -> bool {
216        self.causal_invocation.is_none()
217    }
218}
219
220#[derive(Clone)]
221pub struct ProcessOpScope<'scope> {
222    pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
223    pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'scope>,
224    pub(crate) agent_frame_id: Option<crate::AgentFrameId>,
225    pub(crate) target_agent_frame_id: Option<crate::AgentFrameId>,
226}
227
228impl<'scope> ProcessOpScope<'scope> {
229    pub fn new(scoped_effect_controller: crate::ScopedEffectController<'scope>) -> Self {
230        Self {
231            parent_invocation: None,
232            effect_controller: crate::runtime::RuntimeEffectControllerHandle::borrowed(
233                scoped_effect_controller,
234            ),
235            agent_frame_id: None,
236            target_agent_frame_id: None,
237        }
238    }
239
240    pub fn with_parent_invocation(
241        mut self,
242        parent_invocation: Option<crate::RuntimeInvocation>,
243    ) -> Self {
244        self.parent_invocation = parent_invocation;
245        self
246    }
247
248    pub fn with_agent_frame_id(mut self, agent_frame_id: Option<crate::AgentFrameId>) -> Self {
249        self.agent_frame_id = agent_frame_id;
250        self
251    }
252
253    pub fn with_target_agent_frame_id(
254        mut self,
255        agent_frame_id: Option<crate::AgentFrameId>,
256    ) -> Self {
257        self.target_agent_frame_id = agent_frame_id;
258        self
259    }
260
261    pub fn agent_frame_id(&self) -> Option<&str> {
262        self.agent_frame_id.as_deref()
263    }
264
265    pub fn target_agent_frame_id(&self) -> Option<&str> {
266        self.target_agent_frame_id.as_deref()
267    }
268
269    pub(crate) fn controller(&self) -> &dyn crate::RuntimeEffectController {
270        self.effect_controller.controller()
271    }
272}
273
274#[derive(Clone, Debug, Default)]
275pub struct ProcessStartOptions {
276    pub descriptor: Option<ProcessHandleDescriptor>,
277    /// Runtime-internal spawn provenance override. Set by process execution
278    /// contexts so children started *by a process* inherit the parent's
279    /// originator and wake target instead of being stamped with the ephemeral
280    /// execution scope. `None` means the session start path stamps the
281    /// creating session (the in-session meaning of "start"). This rides
282    /// options — not the request — so in-session callers cannot forge
283    /// provenance through the session surface.
284    pub spawn_provenance: Option<ProcessSpawnProvenance>,
285}
286
287/// Provenance a process-run context hands to its children: the chain's
288/// originator and the chain's wake target. Mirrors the trigger fire path,
289/// where the spawned process inherits the registrant and the grant is derived
290/// from the wake target.
291#[derive(Clone, Debug, PartialEq, Eq)]
292pub struct ProcessSpawnProvenance {
293    pub originator: ProcessOriginator,
294    pub wake_target: Option<SessionScope>,
295}
296
297impl ProcessStartOptions {
298    pub fn new() -> Self {
299        Self::default()
300    }
301
302    pub fn with_descriptor(mut self, descriptor: ProcessHandleDescriptor) -> Self {
303        self.descriptor = Some(descriptor);
304        self
305    }
306
307    pub fn with_optional_descriptor(mut self, descriptor: Option<ProcessHandleDescriptor>) -> Self {
308        self.descriptor = descriptor;
309        self
310    }
311
312    pub fn with_spawn_provenance(mut self, spawn_provenance: ProcessSpawnProvenance) -> Self {
313        self.spawn_provenance = Some(spawn_provenance);
314        self
315    }
316
317    pub fn execution_context(&self, scope: &ProcessOpScope<'_>) -> ProcessExecutionContext {
318        ProcessExecutionContext {
319            causal_invocation: scope.parent_invocation.clone(),
320        }
321    }
322}
323
324/// Public host-facing request for starting a visible process handle.
325#[derive(Clone, Debug, Serialize, Deserialize)]
326pub struct ProcessStartRequest {
327    pub id: ProcessId,
328    pub input: ProcessInput,
329    #[serde(default, skip_serializing_if = "Option::is_none")]
330    pub env_spec: Option<ProcessExecutionEnvSpec>,
331    pub originator: ProcessOriginator,
332    #[serde(default, skip_serializing_if = "Option::is_none")]
333    pub wake_target: Option<SessionScope>,
334    #[serde(default, skip_serializing_if = "Option::is_none")]
335    pub grant: Option<ProcessStartGrant>,
336    #[serde(default)]
337    pub event_types: Vec<ProcessEventType>,
338}
339
340impl ProcessStartRequest {
341    pub fn new(
342        id: impl Into<ProcessId>,
343        input: ProcessInput,
344        originator: ProcessOriginator,
345    ) -> Self {
346        Self {
347            id: id.into(),
348            input,
349            env_spec: None,
350            originator,
351            wake_target: None,
352            grant: None,
353            event_types: default_process_event_types(),
354        }
355    }
356
357    pub fn external(
358        id: impl Into<ProcessId>,
359        originator: ProcessOriginator,
360        metadata: serde_json::Value,
361    ) -> Self {
362        Self::new(id, ProcessInput::External { metadata }, originator)
363    }
364
365    pub fn with_env_spec(mut self, env_spec: ProcessExecutionEnvSpec) -> Self {
366        self.env_spec = Some(env_spec);
367        self
368    }
369
370    pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
371        self.wake_target = wake_target;
372        self
373    }
374
375    pub fn with_grant(mut self, grant: Option<ProcessStartGrant>) -> Self {
376        self.grant = grant;
377        self
378    }
379
380    pub fn with_event_types(
381        mut self,
382        event_types: impl IntoIterator<Item = ProcessEventType>,
383    ) -> Self {
384        self.event_types = event_types.into_iter().collect();
385        self
386    }
387
388    pub fn with_extra_event_types(
389        mut self,
390        event_types: impl IntoIterator<Item = ProcessEventType>,
391    ) -> Self {
392        self.event_types.extend(event_types);
393        self
394    }
395
396    pub fn into_registration(
397        self,
398        host_profile_id: impl Into<String>,
399        env_ref: Option<ProcessExecutionEnvRef>,
400    ) -> ProcessRegistration {
401        ProcessRegistration::new(
402            self.id,
403            self.input,
404            ProcessProvenance::new(self.originator, host_profile_id),
405        )
406        .with_event_types(self.event_types)
407        .with_execution_env_ref(env_ref)
408        .with_wake_target(self.wake_target)
409    }
410}
411
412#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
413pub struct SessionScope {
414    pub session_id: String,
415    #[serde(default, skip_serializing_if = "Option::is_none")]
416    pub agent_frame_id: Option<crate::AgentFrameId>,
417}
418
419#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
420pub struct ProcessProvenance {
421    pub originator: ProcessOriginator,
422    pub host_profile_id: String,
423    #[serde(default, skip_serializing_if = "Option::is_none")]
424    pub caused_by: Option<crate::CausalRef>,
425}
426
427impl ProcessProvenance {
428    pub fn new(originator: ProcessOriginator, host_profile_id: impl Into<String>) -> Self {
429        Self {
430            originator,
431            host_profile_id: host_profile_id.into(),
432            caused_by: None,
433        }
434    }
435
436    pub fn host(host_profile_id: impl Into<String>) -> Self {
437        Self::new(ProcessOriginator::host(), host_profile_id)
438    }
439
440    pub fn session(scope: SessionScope, host_profile_id: impl Into<String>) -> Self {
441        Self::new(ProcessOriginator::session(scope), host_profile_id)
442    }
443
444    pub fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
445        self.caused_by = caused_by;
446        self
447    }
448}
449
450#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
451#[serde(tag = "type", rename_all = "snake_case")]
452pub enum ProcessOriginator {
453    Host,
454    Session { scope: SessionScope },
455}
456
457impl ProcessOriginator {
458    pub fn host() -> Self {
459        Self::Host
460    }
461
462    pub fn session(scope: SessionScope) -> Self {
463        Self::Session { scope }
464    }
465
466    pub fn scope_id(&self) -> String {
467        match self {
468            Self::Host => "host".to_string(),
469            Self::Session { scope } => scope.id().to_string(),
470        }
471    }
472}
473
474impl SessionScope {
475    pub fn new(session_id: impl Into<String>) -> Self {
476        Self {
477            session_id: session_id.into(),
478            agent_frame_id: None,
479        }
480    }
481
482    pub fn for_agent_frame(
483        session_id: impl Into<String>,
484        agent_frame_id: impl Into<crate::AgentFrameId>,
485    ) -> Self {
486        Self {
487            session_id: session_id.into(),
488            agent_frame_id: Some(agent_frame_id.into()),
489        }
490    }
491
492    pub fn id(&self) -> SessionScopeId {
493        match self.agent_frame_id.as_deref() {
494            Some(frame_id) if !frame_id.is_empty() => {
495                SessionScopeId::new(format!("session:{}/frame:{frame_id}", self.session_id))
496            }
497            _ => SessionScopeId::new(format!("session:{}", self.session_id)),
498        }
499    }
500
501    pub fn is_empty(&self) -> bool {
502        self.session_id.is_empty()
503    }
504}
505
506/// Serializable process spec used to start or recover a runtime process.
507#[derive(Debug, Serialize, Deserialize)]
508pub struct ProcessRegistration {
509    pub id: ProcessId,
510    pub input: Arc<ProcessInput>,
511    #[serde(default)]
512    pub event_types: Vec<ProcessEventType>,
513    pub provenance: ProcessProvenance,
514    #[serde(default, skip_serializing_if = "Option::is_none")]
515    pub env_ref: Option<ProcessExecutionEnvRef>,
516    #[serde(default, skip_serializing_if = "Option::is_none")]
517    pub wake_target: Option<SessionScope>,
518}
519
520impl Clone for ProcessRegistration {
521    fn clone(&self) -> Self {
522        Self {
523            id: self.id.clone(),
524            input: Arc::clone(&self.input),
525            event_types: self.event_types.clone(),
526            provenance: self.provenance.clone(),
527            env_ref: self.env_ref.clone(),
528            wake_target: self.wake_target.clone(),
529        }
530    }
531}
532
533impl ProcessRegistration {
534    pub fn new(
535        id: impl Into<ProcessId>,
536        input: ProcessInput,
537        provenance: ProcessProvenance,
538    ) -> Self {
539        Self {
540            id: id.into(),
541            input: Arc::new(input),
542            event_types: default_process_event_types(),
543            provenance,
544            env_ref: None,
545            wake_target: None,
546        }
547    }
548
549    pub(crate) fn session_start_draft(id: impl Into<ProcessId>, input: ProcessInput) -> Self {
550        Self::new(id, input, ProcessProvenance::host("session-start-draft"))
551    }
552
553    pub fn with_process_provenance(mut self, provenance: ProcessProvenance) -> Self {
554        self.provenance = provenance;
555        self
556    }
557
558    pub fn with_execution_env_ref(mut self, env_ref: Option<ProcessExecutionEnvRef>) -> Self {
559        self.env_ref = env_ref;
560        self
561    }
562
563    pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
564        self.wake_target = wake_target;
565        self
566    }
567
568    pub fn with_event_types(
569        mut self,
570        event_types: impl IntoIterator<Item = ProcessEventType>,
571    ) -> Self {
572        self.event_types = event_types.into_iter().collect();
573        self
574    }
575
576    pub fn with_extra_event_types(
577        mut self,
578        event_types: impl IntoIterator<Item = ProcessEventType>,
579    ) -> Self {
580        self.event_types.extend(event_types);
581        self
582    }
583}
584
585#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
586#[serde(tag = "state", rename_all = "snake_case")]
587pub enum ProcessStatus {
588    #[default]
589    Running,
590    Completed {
591        await_output: ProcessAwaitOutput,
592    },
593    Failed {
594        await_output: ProcessAwaitOutput,
595    },
596    Cancelled {
597        await_output: ProcessAwaitOutput,
598    },
599}
600
601impl ProcessStatus {
602    pub fn from_terminal(terminal: ProcessTerminalSemantics) -> Self {
603        match terminal.state {
604            ProcessTerminalState::Completed => Self::Completed {
605                await_output: terminal.await_output,
606            },
607            ProcessTerminalState::Failed => Self::Failed {
608                await_output: terminal.await_output,
609            },
610            ProcessTerminalState::Cancelled => Self::Cancelled {
611                await_output: terminal.await_output,
612            },
613        }
614    }
615
616    pub fn is_terminal(&self) -> bool {
617        !matches!(self, Self::Running)
618    }
619
620    pub fn label(&self) -> &'static str {
621        match self {
622            Self::Running => "running",
623            Self::Completed { .. } => "completed",
624            Self::Failed { .. } => "failed",
625            Self::Cancelled { .. } => "cancelled",
626        }
627    }
628
629    pub fn terminal_state(&self) -> Option<ProcessTerminalState> {
630        match self {
631            Self::Running => None,
632            Self::Completed { .. } => Some(ProcessTerminalState::Completed),
633            Self::Failed { .. } => Some(ProcessTerminalState::Failed),
634            Self::Cancelled { .. } => Some(ProcessTerminalState::Cancelled),
635        }
636    }
637
638    pub fn await_output(&self) -> Option<&ProcessAwaitOutput> {
639        match self {
640            Self::Running => None,
641            Self::Completed { await_output }
642            | Self::Failed { await_output }
643            | Self::Cancelled { await_output } => Some(await_output),
644        }
645    }
646
647    pub fn terminal_semantics(&self) -> Option<ProcessTerminalSemantics> {
648        Some(ProcessTerminalSemantics {
649            state: self.terminal_state()?,
650            await_output: self.await_output()?.clone(),
651        })
652    }
653}
654
655/// Durable process row. Session-visible addressability lives in
656/// [`ProcessHandleGrant`], not in the process record.
657#[derive(Clone, Debug, Serialize, Deserialize)]
658pub struct ProcessRecord {
659    pub id: ProcessId,
660    pub registration_hash: String,
661    pub input: Arc<ProcessInput>,
662    #[serde(default)]
663    pub event_types: Vec<ProcessEventType>,
664    pub provenance: ProcessProvenance,
665    #[serde(default, skip_serializing_if = "Option::is_none")]
666    pub env_ref: Option<ProcessExecutionEnvRef>,
667    #[serde(default, skip_serializing_if = "Option::is_none")]
668    pub wake_target: Option<SessionScope>,
669    #[serde(default)]
670    pub created_at_ms: u64,
671    #[serde(default)]
672    pub updated_at_ms: u64,
673    #[serde(default, skip_serializing_if = "Option::is_none")]
674    pub external_ref: Option<ProcessExternalRef>,
675    #[serde(default, skip_serializing_if = "Option::is_none")]
676    pub wait: Option<WaitState>,
677    #[serde(default)]
678    pub status: ProcessStatus,
679}
680
681#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
682pub struct WaitState {
683    pub kind: WaitKind,
684    pub since_ms: u64,
685}
686
687#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
688#[serde(tag = "kind", rename_all = "snake_case")]
689pub enum WaitKind {
690    Signal {
691        name: String,
692        event_type: String,
693        key: String,
694        ordinal: u64,
695    },
696}
697
698impl ProcessRecord {
699    pub fn from_registration(mut registration: ProcessRegistration) -> Self {
700        ensure_core_event_types(&mut registration);
701        validate_process_registration(&registration)
702            .expect("process registration should be valid before record construction");
703        let registration_hash = process_registration_hash(&registration)
704            .expect("process registration should hash before record construction");
705        Self::from_prepared_registration(registration, registration_hash, current_epoch_ms())
706    }
707
708    pub fn from_prepared_registration(
709        registration: ProcessRegistration,
710        registration_hash: String,
711        now_ms: u64,
712    ) -> Self {
713        Self {
714            id: registration.id,
715            registration_hash,
716            input: registration.input,
717            event_types: registration.event_types,
718            provenance: registration.provenance,
719            env_ref: registration.env_ref,
720            wake_target: registration.wake_target,
721            created_at_ms: now_ms,
722            updated_at_ms: now_ms,
723            external_ref: None,
724            wait: None,
725            status: ProcessStatus::Running,
726        }
727    }
728
729    pub fn is_terminal(&self) -> bool {
730        self.status.is_terminal()
731    }
732
733    pub fn originator_scope_id(&self) -> String {
734        self.provenance.originator.scope_id()
735    }
736
737    pub fn host_profile_id(&self) -> &str {
738        &self.provenance.host_profile_id
739    }
740}
741
742/// Wire-format version stamped on every persisted [`ProcessLease`].
743///
744/// Bump when the on-wire shape of `ProcessLease` changes in a way that older
745/// code cannot safely deserialize.
746pub const PROCESS_LEASE_SCHEMA_VERSION: u32 = 1;
747
748/// Durable lease over a non-terminal background process.
749///
750/// The lease pair `(owner_id, lease_token)` plus `fencing_token` are how lash guarantees that
751/// one non-terminal process is re-executed by exactly one worker at a time —
752/// even after a crash, even across two workers that both sweep the same
753/// registry for recoverable work. The durable backend
754/// (`lash-sqlite-store`) uses these to serialize concurrent claims on the same
755/// `process_id`; future distributed durable backends use the *same* fields to
756/// coordinate workers that don't share a file system.
757///
758/// **This is not single-process theatre.** The owner / fencing-token /
759/// lease-token triple is the public contract that lets any backend detect and
760/// reject stale writers. Treat it as load-bearing, not defensive.
761#[derive(Clone, Debug, Serialize, Deserialize)]
762pub struct ProcessLease {
763    pub schema_version: u32,
764    pub process_id: ProcessId,
765    pub owner_id: String,
766    pub lease_token: String,
767    pub fencing_token: u64,
768    pub claimed_at_epoch_ms: u64,
769    pub expires_at_epoch_ms: u64,
770}
771
772#[derive(Clone, Debug, Serialize, Deserialize)]
773pub struct ProcessLeaseCompletion {
774    pub process_id: ProcessId,
775    pub lease_token: String,
776}
777
778impl ProcessLeaseCompletion {
779    pub fn from_lease(lease: &ProcessLease) -> Self {
780        Self {
781            process_id: lease.process_id.clone(),
782            lease_token: lease.lease_token.clone(),
783        }
784    }
785}
786
787/// Durable backend reference for background work accepted outside the local process.
788#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
789pub struct ProcessExternalRef {
790    pub backend: String,
791    pub id: String,
792    #[serde(default, skip_serializing_if = "Option::is_none")]
793    pub metadata: Option<serde_json::Value>,
794}
795
796#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
797pub struct ProcessHandleDescriptor {
798    #[serde(default, skip_serializing_if = "Option::is_none")]
799    pub kind: Option<String>,
800    #[serde(default, skip_serializing_if = "Option::is_none")]
801    pub label: Option<String>,
802}
803
804impl ProcessHandleDescriptor {
805    pub fn new(kind: Option<impl Into<String>>, label: Option<impl Into<String>>) -> Self {
806        Self {
807            kind: kind.map(Into::into),
808            label: label.map(Into::into),
809        }
810    }
811}
812
813#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
814pub struct ProcessHandleGrant {
815    pub session_id: String,
816    pub process_id: ProcessId,
817    pub descriptor: ProcessHandleDescriptor,
818}
819
820pub type ProcessHandleGrantEntry = (ProcessHandleGrant, ProcessRecord);
821
822#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
823#[serde(rename_all = "snake_case")]
824pub enum ProcessLifecycleStatus {
825    #[default]
826    Running,
827    Completed,
828    Failed,
829    Cancelled,
830}
831
832impl ProcessLifecycleStatus {
833    pub fn label(self) -> &'static str {
834        match self {
835            Self::Running => "running",
836            Self::Completed => "completed",
837            Self::Failed => "failed",
838            Self::Cancelled => "cancelled",
839        }
840    }
841
842    pub fn is_terminal(self) -> bool {
843        !matches!(self, Self::Running)
844    }
845
846    pub fn terminal_state(self) -> Option<ProcessTerminalState> {
847        match self {
848            Self::Running => None,
849            Self::Completed => Some(ProcessTerminalState::Completed),
850            Self::Failed => Some(ProcessTerminalState::Failed),
851            Self::Cancelled => Some(ProcessTerminalState::Cancelled),
852        }
853    }
854}
855
856impl From<&ProcessStatus> for ProcessLifecycleStatus {
857    fn from(status: &ProcessStatus) -> Self {
858        match status {
859            ProcessStatus::Running => Self::Running,
860            ProcessStatus::Completed { .. } => Self::Completed,
861            ProcessStatus::Failed { .. } => Self::Failed,
862            ProcessStatus::Cancelled { .. } => Self::Cancelled,
863        }
864    }
865}
866
867impl From<ProcessStatus> for ProcessLifecycleStatus {
868    fn from(status: ProcessStatus) -> Self {
869        Self::from(&status)
870    }
871}
872
873#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
874pub struct ProcessHandleSummary {
875    #[serde(rename = "__handle__")]
876    pub handle_type: String,
877    pub id: ProcessId,
878    pub process_id: ProcessId,
879    pub descriptor: ProcessHandleDescriptor,
880    #[serde(default, skip_serializing_if = "Option::is_none")]
881    pub definition: Option<ProcessDefinitionSummary>,
882    pub status: ProcessLifecycleStatus,
883}
884
885impl ProcessHandleSummary {
886    pub fn new(
887        process_id: impl Into<ProcessId>,
888        descriptor: ProcessHandleDescriptor,
889        status: ProcessLifecycleStatus,
890    ) -> Self {
891        let process_id = process_id.into();
892        Self {
893            handle_type: "process".to_string(),
894            id: process_id.clone(),
895            process_id,
896            descriptor,
897            definition: None,
898            status,
899        }
900    }
901
902    pub fn with_definition(mut self, definition: Option<ProcessDefinitionSummary>) -> Self {
903        self.definition = definition;
904        self
905    }
906
907    pub fn from_grant_record(grant: ProcessHandleGrant, record: ProcessRecord) -> Self {
908        let definition = ProcessDefinitionSummary::from_input(record.input.as_ref());
909        Self::new(
910            record.id,
911            grant.descriptor,
912            ProcessLifecycleStatus::from(record.status),
913        )
914        .with_definition(definition)
915    }
916}
917
918impl From<ProcessHandleGrantEntry> for ProcessHandleSummary {
919    fn from((grant, record): ProcessHandleGrantEntry) -> Self {
920        Self::from_grant_record(grant, record)
921    }
922}
923
924#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
925pub struct ProcessCancelSummary {
926    pub process_id: ProcessId,
927    pub status: ProcessLifecycleStatus,
928}
929
930impl ProcessCancelSummary {
931    pub fn from_record(record: ProcessRecord) -> Self {
932        Self {
933            process_id: record.id,
934            status: ProcessLifecycleStatus::from(record.status),
935        }
936    }
937}
938
939#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
940pub struct ProcessDefinitionSummary {
941    pub name: String,
942}
943
944impl ProcessDefinitionSummary {
945    pub fn from_input(input: &ProcessInput) -> Option<Self> {
946        match input {
947            ProcessInput::LashlangProcess { process_name, .. } => Some(Self {
948                name: process_name.clone(),
949            }),
950            ProcessInput::ToolCall { .. }
951            | ProcessInput::SessionTurn { .. }
952            | ProcessInput::External { .. } => None,
953        }
954    }
955}
956
957#[derive(Clone, Debug, PartialEq, Eq)]
958pub struct ProcessDefinitionSelector {
959    module_ref: lashlang::ModuleRef,
960    required_surface_ref: lashlang::RequiredSurfaceRef,
961    process_ref: lashlang::ProcessRef,
962    process_name: String,
963}
964
965impl ProcessDefinitionSelector {
966    pub fn decode(value: &serde_json::Value) -> Result<Self, String> {
967        if value
968            .get(lashlang::LASH_PROCESS_VALUE_KEY)
969            .and_then(serde_json::Value::as_bool)
970            != Some(true)
971        {
972            return Err("definition must be a process definition value".to_string());
973        }
974        Ok(Self {
975            module_ref: decode_process_definition_field(
976                value,
977                lashlang::LASH_MODULE_REF_KEY,
978                "definition",
979            )?,
980            required_surface_ref: decode_process_definition_field(
981                value,
982                lashlang::LASH_REQUIRED_SURFACE_REF_KEY,
983                "definition",
984            )?,
985            process_ref: decode_process_definition_field(
986                value,
987                lashlang::LASH_PROCESS_REF_KEY,
988                "definition",
989            )?,
990            process_name: value
991                .get(lashlang::LASH_PROCESS_NAME_KEY)
992                .and_then(serde_json::Value::as_str)
993                .ok_or_else(|| "definition is missing its process name".to_string())?
994                .to_string(),
995        })
996    }
997
998    pub fn matches_input(&self, input: &ProcessInput) -> bool {
999        match input {
1000            ProcessInput::LashlangProcess {
1001                module_ref,
1002                process_ref,
1003                required_surface_ref,
1004                process_name,
1005                ..
1006            } => {
1007                self.module_ref == *module_ref
1008                    && self.required_surface_ref == *required_surface_ref
1009                    && self.process_ref == *process_ref
1010                    && self.process_name == *process_name
1011            }
1012            ProcessInput::ToolCall { .. }
1013            | ProcessInput::SessionTurn { .. }
1014            | ProcessInput::External { .. } => false,
1015        }
1016    }
1017}
1018
1019fn decode_process_definition_field<T: serde::de::DeserializeOwned>(
1020    value: &serde_json::Value,
1021    field: &'static str,
1022    label: &'static str,
1023) -> Result<T, String> {
1024    serde_json::from_value(
1025        value
1026            .get(field)
1027            .cloned()
1028            .ok_or_else(|| format!("{label} is missing {field}"))?,
1029    )
1030    .map_err(|err| format!("{label} has invalid {field}: {err}"))
1031}
1032
1033#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
1034pub enum ProcessStatusFilter {
1035    #[default]
1036    Running,
1037    Completed,
1038    Failed,
1039    Cancelled,
1040    Any,
1041}
1042
1043impl ProcessStatusFilter {
1044    pub fn decode(value: Option<&str>) -> Result<Self, String> {
1045        match value.unwrap_or("running") {
1046            "running" => Ok(Self::Running),
1047            "completed" => Ok(Self::Completed),
1048            "failed" => Ok(Self::Failed),
1049            "cancelled" => Ok(Self::Cancelled),
1050            "any" => Ok(Self::Any),
1051            other => Err(format!(
1052                "processes.list status must be `running`, `completed`, `failed`, `cancelled`, or `any`, got `{other}`"
1053            )),
1054        }
1055    }
1056
1057    pub fn list_mode(self) -> ProcessListMode {
1058        match self {
1059            Self::Running => ProcessListMode::Live,
1060            Self::Completed | Self::Failed | Self::Cancelled | Self::Any => ProcessListMode::All,
1061        }
1062    }
1063
1064    pub fn matches(self, status: ProcessLifecycleStatus) -> bool {
1065        match self {
1066            Self::Running => status == ProcessLifecycleStatus::Running,
1067            Self::Completed => status == ProcessLifecycleStatus::Completed,
1068            Self::Failed => status == ProcessLifecycleStatus::Failed,
1069            Self::Cancelled => status == ProcessLifecycleStatus::Cancelled,
1070            Self::Any => true,
1071        }
1072    }
1073}
1074
1075#[derive(Clone, Debug, Default, PartialEq, Eq)]
1076pub struct ProcessListFilter {
1077    pub definition: Option<ProcessDefinitionSelector>,
1078    pub status: ProcessStatusFilter,
1079    pub waiting: Option<bool>,
1080}
1081
1082impl ProcessListFilter {
1083    pub fn decode(args: &serde_json::Value) -> Result<Self, String> {
1084        let map = args
1085            .as_object()
1086            .ok_or_else(|| "processes.list expects a record of process filters".to_string())?;
1087        for key in map.keys() {
1088            match key.as_str() {
1089                "definition" | "status" | "waiting" => {}
1090                _ => return Err(format!("processes.list unknown filter `{key}`")),
1091            }
1092        }
1093        let definition = args
1094            .get("definition")
1095            .map(ProcessDefinitionSelector::decode)
1096            .transpose()?;
1097        let status =
1098            ProcessStatusFilter::decode(args.get("status").and_then(serde_json::Value::as_str))?;
1099        let waiting = args
1100            .get("waiting")
1101            .map(|value| {
1102                value
1103                    .as_bool()
1104                    .ok_or_else(|| "processes.list `waiting` filter must be a boolean".to_string())
1105            })
1106            .transpose()?;
1107        Ok(Self {
1108            definition,
1109            status,
1110            waiting,
1111        })
1112    }
1113
1114    pub fn list_mode(&self) -> ProcessListMode {
1115        self.status.list_mode()
1116    }
1117
1118    pub fn matches_entry(&self, entry: &ProcessHandleGrantEntry) -> bool {
1119        let (_grant, record) = entry;
1120        self.matches_record(record)
1121    }
1122
1123    pub fn matches_record(&self, record: &ProcessRecord) -> bool {
1124        let status = ProcessLifecycleStatus::from(&record.status);
1125        self.status.matches(status)
1126            && self
1127                .definition
1128                .as_ref()
1129                .is_none_or(|definition| definition.matches_input(record.input.as_ref()))
1130            && self
1131                .waiting
1132                .is_none_or(|waiting| record.wait.is_some() == waiting)
1133    }
1134}
1135
1136#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1137#[serde(rename_all = "snake_case")]
1138pub enum ProcessListMode {
1139    #[default]
1140    Live,
1141    All,
1142}
1143
1144impl ProcessListMode {
1145    pub fn as_str(self) -> &'static str {
1146        match self {
1147            Self::Live => "live",
1148            Self::All => "all",
1149        }
1150    }
1151}
1152
1153#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1154pub struct ProcessStartGrant {
1155    pub session_scope: SessionScope,
1156    pub descriptor: ProcessHandleDescriptor,
1157}
1158
1159#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1160pub struct ProcessSessionDeleteReport {
1161    pub session_id: String,
1162    pub revoked_handle_count: usize,
1163    pub deleted_wake_count: usize,
1164    pub orphaned_process_ids: Vec<String>,
1165    pub preserved_process_ids: Vec<String>,
1166}
1167
1168#[cfg(test)]
1169mod tests {
1170    use serde_json::json;
1171
1172    use super::*;
1173
1174    fn process_ref(component: &str, pos: usize) -> lashlang::ProcessRef {
1175        lashlang::ProcessRef {
1176            component: lashlang::ContentHash::new(component),
1177            pos: pos as u32,
1178        }
1179    }
1180
1181    fn process_value(
1182        module_ref: &lashlang::ModuleRef,
1183        surface_ref: &lashlang::RequiredSurfaceRef,
1184        process_ref: &lashlang::ProcessRef,
1185        name: &str,
1186    ) -> serde_json::Value {
1187        let mut value = serde_json::Map::new();
1188        value.insert(lashlang::LASH_PROCESS_VALUE_KEY.to_string(), json!(true));
1189        value.insert(lashlang::LASH_MODULE_REF_KEY.to_string(), json!(module_ref));
1190        value.insert(
1191            lashlang::LASH_REQUIRED_SURFACE_REF_KEY.to_string(),
1192            json!(surface_ref),
1193        );
1194        value.insert(
1195            lashlang::LASH_PROCESS_REF_KEY.to_string(),
1196            json!(process_ref),
1197        );
1198        value.insert(lashlang::LASH_PROCESS_NAME_KEY.to_string(), json!(name));
1199        serde_json::Value::Object(value)
1200    }
1201
1202    fn lashlang_entry(
1203        process_id: &str,
1204        module_ref: lashlang::ModuleRef,
1205        surface_ref: lashlang::RequiredSurfaceRef,
1206        process_ref: lashlang::ProcessRef,
1207        process_name: &str,
1208        status: ProcessStatus,
1209    ) -> ProcessHandleGrantEntry {
1210        let mut record = ProcessRecord::from_registration(
1211            ProcessRegistration::new(
1212                process_id,
1213                ProcessInput::LashlangProcess {
1214                    module_ref,
1215                    process_ref,
1216                    required_surface_ref: surface_ref,
1217                    process_name: process_name.to_string(),
1218                    args: serde_json::Map::new(),
1219                },
1220                ProcessProvenance::host("model-test-host"),
1221            )
1222            .with_execution_env_ref(Some(ProcessExecutionEnvRef::new(format!(
1223                "process-env:test:{process_id}"
1224            )))),
1225        );
1226        record.status = status;
1227        (
1228            ProcessHandleGrant {
1229                session_id: "session".to_string(),
1230                process_id: process_id.to_string(),
1231                descriptor: ProcessHandleDescriptor::new(Some("lashlang"), Some(process_name)),
1232            },
1233            record,
1234        )
1235    }
1236
1237    #[test]
1238    fn process_list_filter_matches_definition_and_status() {
1239        let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
1240        let surface_ref = lashlang::RequiredSurfaceRef::new(&lashlang::ContentHash::new("surface"));
1241        let target_ref = process_ref("target", 0);
1242        let other_ref = process_ref("other", 1);
1243        let filter = ProcessListFilter::decode(&json!({
1244            "definition": process_value(&module_ref, &surface_ref, &target_ref, "target"),
1245            "status": "completed"
1246        }))
1247        .expect("decode filter");
1248
1249        let matching = lashlang_entry(
1250            "matching",
1251            module_ref.clone(),
1252            surface_ref.clone(),
1253            target_ref,
1254            "target",
1255            ProcessStatus::Completed {
1256                await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1257                    json!(true),
1258                )),
1259            },
1260        );
1261        let wrong_definition = lashlang_entry(
1262            "wrong-definition",
1263            module_ref,
1264            surface_ref,
1265            other_ref,
1266            "other",
1267            ProcessStatus::Completed {
1268                await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1269                    json!(true),
1270                )),
1271            },
1272        );
1273
1274        assert_eq!(filter.list_mode(), ProcessListMode::All);
1275        assert!(filter.matches_entry(&matching));
1276        assert!(!filter.matches_entry(&wrong_definition));
1277    }
1278
1279    #[test]
1280    fn process_list_filter_matches_waiting_facet() {
1281        let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
1282        let surface_ref = lashlang::RequiredSurfaceRef::new(&lashlang::ContentHash::new("surface"));
1283        let process_ref = process_ref("target", 0);
1284        let mut waiting_entry = lashlang_entry(
1285            "waiting",
1286            module_ref.clone(),
1287            surface_ref.clone(),
1288            process_ref.clone(),
1289            "target",
1290            ProcessStatus::Running,
1291        );
1292        waiting_entry.1.wait = Some(WaitState {
1293            since_ms: 42,
1294            kind: WaitKind::Signal {
1295                name: "ready".to_string(),
1296                event_type: "signal.ready".to_string(),
1297                key: "process:waiting:signal.ready:1".to_string(),
1298                ordinal: 1,
1299            },
1300        });
1301        let idle_entry = lashlang_entry(
1302            "idle",
1303            module_ref,
1304            surface_ref,
1305            process_ref,
1306            "target",
1307            ProcessStatus::Running,
1308        );
1309        let waiting_filter =
1310            ProcessListFilter::decode(&json!({ "waiting": true })).expect("decode waiting filter");
1311        let idle_filter =
1312            ProcessListFilter::decode(&json!({ "waiting": false })).expect("decode idle filter");
1313
1314        assert_eq!(waiting_filter.list_mode(), ProcessListMode::Live);
1315        assert!(waiting_filter.matches_entry(&waiting_entry));
1316        assert!(!waiting_filter.matches_entry(&idle_entry));
1317        assert!(!idle_filter.matches_entry(&waiting_entry));
1318        assert!(idle_filter.matches_entry(&idle_entry));
1319        assert!(
1320            ProcessListFilter::decode(&json!({ "waiting": "yes" }))
1321                .expect_err("invalid waiting filter")
1322                .contains("must be a boolean")
1323        );
1324    }
1325}