Skip to main content

lash_core/runtime/process/
model.rs

1use std::fmt;
2use std::sync::Arc;
3
4use serde::{Deserialize, Serialize};
5
6use super::events::{
7    ProcessAwaitOutput, ProcessEventType, ProcessTerminalSemantics, ProcessTerminalState,
8    default_process_event_types,
9};
10use super::time::current_epoch_ms;
11use super::validation::{
12    ensure_core_event_types, process_registration_hash, validate_process_registration,
13};
14
15pub type ProcessId = String;
16
17#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
18#[serde(transparent)]
19pub struct SessionScopeId(String);
20
21impl SessionScopeId {
22    pub fn new(value: impl Into<String>) -> Self {
23        Self(value.into())
24    }
25
26    pub fn as_str(&self) -> &str {
27        &self.0
28    }
29}
30
31impl fmt::Display for SessionScopeId {
32    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
33        formatter.write_str(&self.0)
34    }
35}
36
37impl From<String> for SessionScopeId {
38    fn from(value: String) -> Self {
39        Self::new(value)
40    }
41}
42
43impl From<&str> for SessionScopeId {
44    fn from(value: &str) -> Self {
45        Self::new(value)
46    }
47}
48
49/// Durable executable input for a process.
50#[derive(Debug, Serialize, Deserialize)]
51#[serde(tag = "type", rename_all = "snake_case")]
52pub enum ProcessInput {
53    ToolCall {
54        call: crate::PreparedToolCall,
55    },
56    LashlangProcess {
57        module_ref: lashlang::ModuleRef,
58        process_ref: lashlang::ProcessRef,
59        host_requirements_ref: lashlang::HostRequirementsRef,
60        process_name: String,
61        #[serde(default)]
62        args: serde_json::Map<String, serde_json::Value>,
63    },
64    SessionTurn {
65        create_request: Box<crate::SessionCreateRequest>,
66        turn_input: Box<crate::TurnInput>,
67        output_contract: crate::ToolOutputContract,
68    },
69    External {
70        #[serde(default)]
71        metadata: serde_json::Value,
72    },
73}
74
75impl Clone for ProcessInput {
76    fn clone(&self) -> Self {
77        match self {
78            Self::ToolCall { call } => Self::ToolCall { call: call.clone() },
79            Self::LashlangProcess {
80                module_ref,
81                process_ref,
82                host_requirements_ref,
83                process_name,
84                args,
85            } => Self::LashlangProcess {
86                module_ref: module_ref.clone(),
87                process_ref: process_ref.clone(),
88                host_requirements_ref: host_requirements_ref.clone(),
89                process_name: process_name.clone(),
90                args: args.clone(),
91            },
92            Self::SessionTurn {
93                create_request,
94                turn_input,
95                output_contract,
96            } => Self::SessionTurn {
97                create_request: create_request.clone(),
98                turn_input: turn_input.clone(),
99                output_contract: output_contract.clone(),
100            },
101            Self::External { metadata } => Self::External {
102                metadata: metadata.clone(),
103            },
104        }
105    }
106}
107
108#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
109#[serde(transparent)]
110pub struct ProcessExecutionEnvRef(String);
111
112impl ProcessExecutionEnvRef {
113    pub fn new(value: impl Into<String>) -> Self {
114        Self(value.into())
115    }
116
117    pub fn as_str(&self) -> &str {
118        &self.0
119    }
120}
121
122impl fmt::Display for ProcessExecutionEnvRef {
123    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
124        formatter.write_str(&self.0)
125    }
126}
127
128#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
129#[serde(deny_unknown_fields)]
130pub struct ProcessExecutionEnvSpec {
131    #[serde(default)]
132    pub plugin_options: crate::PluginOptions,
133    #[serde(default)]
134    pub policy: crate::SessionPolicy,
135}
136
137impl ProcessExecutionEnvSpec {
138    pub fn new(plugin_options: crate::PluginOptions, policy: crate::SessionPolicy) -> Self {
139        Self {
140            plugin_options,
141            policy,
142        }
143    }
144
145    pub fn stable_ref(&self) -> Result<ProcessExecutionEnvRef, serde_json::Error> {
146        crate::stable_hash::stable_json_sha256_hex(self)
147            .map(|hash| ProcessExecutionEnvRef::new(format!("process-env:sha256:{hash}")))
148    }
149
150    pub fn to_store_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
151        serde_json::to_vec(self)
152    }
153
154    pub fn from_store_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
155        serde_json::from_slice(bytes)
156    }
157}
158
159pub async fn persist_process_execution_env(
160    artifact_store: &dyn lashlang::LashlangArtifactStore,
161    spec: &ProcessExecutionEnvSpec,
162) -> Result<ProcessExecutionEnvRef, crate::PluginError> {
163    let env_ref = spec.stable_ref().map_err(|err| {
164        crate::PluginError::Session(format!("failed to hash process execution env: {err}"))
165    })?;
166    let bytes = spec.to_store_bytes().map_err(|err| {
167        crate::PluginError::Session(format!("failed to encode process execution env: {err}"))
168    })?;
169    artifact_store
170        .put_artifact_bytes(env_ref.as_str(), "process_execution_env", &bytes)
171        .await
172        .map_err(|err| {
173            crate::PluginError::Session(format!(
174                "failed to persist process execution env `{env_ref}`: {err}"
175            ))
176        })?;
177    Ok(env_ref)
178}
179
180pub async fn load_process_execution_env(
181    artifact_store: &dyn lashlang::LashlangArtifactStore,
182    env_ref: &ProcessExecutionEnvRef,
183) -> Result<ProcessExecutionEnvSpec, crate::PluginError> {
184    let bytes = artifact_store
185        .get_artifact_bytes(env_ref.as_str())
186        .await
187        .map_err(|err| {
188            crate::PluginError::Session(format!(
189                "failed to load process execution env `{env_ref}`: {err}"
190            ))
191        })?
192        .ok_or_else(|| {
193            crate::PluginError::Session(format!("missing process execution env `{env_ref}`"))
194        })?;
195    ProcessExecutionEnvSpec::from_store_bytes(&bytes).map_err(|err| {
196        crate::PluginError::Session(format!(
197            "failed to decode process execution env `{env_ref}`: {err}"
198        ))
199    })
200}
201
202/// Execution-local context for process runners. Durable edges live on the
203/// process record.
204#[derive(Clone, Debug, Default, Serialize, Deserialize)]
205pub struct ProcessExecutionContext {
206    #[serde(default, skip_serializing_if = "Option::is_none")]
207    pub causal_invocation: Option<crate::RuntimeInvocation>,
208}
209
210impl ProcessExecutionContext {
211    pub fn with_causal_invocation(mut self, invocation: Option<crate::RuntimeInvocation>) -> Self {
212        self.causal_invocation = invocation;
213        self
214    }
215
216    pub fn is_empty(&self) -> bool {
217        self.causal_invocation.is_none()
218    }
219}
220
221#[derive(Clone)]
222pub struct ProcessOpScope<'scope> {
223    pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
224    pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'scope>,
225    pub(crate) agent_frame_id: Option<crate::AgentFrameId>,
226    pub(crate) target_agent_frame_id: Option<crate::AgentFrameId>,
227}
228
229impl<'scope> ProcessOpScope<'scope> {
230    pub fn new(scoped_effect_controller: crate::ScopedEffectController<'scope>) -> Self {
231        Self {
232            parent_invocation: None,
233            effect_controller: crate::runtime::RuntimeEffectControllerHandle::borrowed(
234                scoped_effect_controller,
235            ),
236            agent_frame_id: None,
237            target_agent_frame_id: None,
238        }
239    }
240
241    pub fn with_parent_invocation(
242        mut self,
243        parent_invocation: Option<crate::RuntimeInvocation>,
244    ) -> Self {
245        self.parent_invocation = parent_invocation;
246        self
247    }
248
249    pub fn with_agent_frame_id(mut self, agent_frame_id: Option<crate::AgentFrameId>) -> Self {
250        self.agent_frame_id = agent_frame_id;
251        self
252    }
253
254    pub fn with_target_agent_frame_id(
255        mut self,
256        agent_frame_id: Option<crate::AgentFrameId>,
257    ) -> Self {
258        self.target_agent_frame_id = agent_frame_id;
259        self
260    }
261
262    pub fn agent_frame_id(&self) -> Option<&str> {
263        self.agent_frame_id.as_deref()
264    }
265
266    pub fn target_agent_frame_id(&self) -> Option<&str> {
267        self.target_agent_frame_id.as_deref()
268    }
269
270    pub(crate) fn controller(&self) -> &dyn crate::RuntimeEffectController {
271        self.effect_controller.controller()
272    }
273}
274
275#[derive(Clone, Debug, Default)]
276pub struct ProcessStartOptions {
277    pub descriptor: Option<ProcessHandleDescriptor>,
278    /// Runtime-internal spawn provenance override. Set by process execution
279    /// contexts so children started *by a process* inherit the parent's
280    /// originator and wake target instead of being stamped with the ephemeral
281    /// execution scope. `None` means the session start path stamps the
282    /// creating session (the in-session meaning of "start"). This rides
283    /// options — not the request — so in-session callers cannot forge
284    /// provenance through the session surface.
285    pub spawn_provenance: Option<ProcessSpawnProvenance>,
286}
287
288/// Provenance a process-run context hands to its children: the chain's
289/// originator and the chain's wake target. Mirrors the trigger fire path,
290/// where the spawned process inherits the registrant and the grant is derived
291/// from the wake target.
292#[derive(Clone, Debug, PartialEq, Eq)]
293pub struct ProcessSpawnProvenance {
294    pub originator: ProcessOriginator,
295    pub wake_target: Option<SessionScope>,
296}
297
298impl ProcessStartOptions {
299    pub fn new() -> Self {
300        Self::default()
301    }
302
303    pub fn with_descriptor(mut self, descriptor: ProcessHandleDescriptor) -> Self {
304        self.descriptor = Some(descriptor);
305        self
306    }
307
308    pub fn with_optional_descriptor(mut self, descriptor: Option<ProcessHandleDescriptor>) -> Self {
309        self.descriptor = descriptor;
310        self
311    }
312
313    pub fn with_spawn_provenance(mut self, spawn_provenance: ProcessSpawnProvenance) -> Self {
314        self.spawn_provenance = Some(spawn_provenance);
315        self
316    }
317
318    pub fn execution_context(&self, scope: &ProcessOpScope<'_>) -> ProcessExecutionContext {
319        ProcessExecutionContext {
320            causal_invocation: scope.parent_invocation.clone(),
321        }
322    }
323}
324
325/// Public host-facing request for starting a visible process handle.
326#[derive(Clone, Debug, Serialize, Deserialize)]
327pub struct ProcessStartRequest {
328    pub id: ProcessId,
329    pub input: ProcessInput,
330    #[serde(default, skip_serializing_if = "Option::is_none")]
331    pub env_spec: Option<ProcessExecutionEnvSpec>,
332    pub originator: ProcessOriginator,
333    #[serde(default, skip_serializing_if = "Option::is_none")]
334    pub wake_target: Option<SessionScope>,
335    #[serde(default, skip_serializing_if = "Option::is_none")]
336    pub grant: Option<ProcessStartGrant>,
337    #[serde(default)]
338    pub event_types: Vec<ProcessEventType>,
339}
340
341impl ProcessStartRequest {
342    pub fn new(
343        id: impl Into<ProcessId>,
344        input: ProcessInput,
345        originator: ProcessOriginator,
346    ) -> Self {
347        Self {
348            id: id.into(),
349            input,
350            env_spec: None,
351            originator,
352            wake_target: None,
353            grant: None,
354            event_types: default_process_event_types(),
355        }
356    }
357
358    pub fn external(
359        id: impl Into<ProcessId>,
360        originator: ProcessOriginator,
361        metadata: serde_json::Value,
362    ) -> Self {
363        Self::new(id, ProcessInput::External { metadata }, originator)
364    }
365
366    pub fn with_env_spec(mut self, env_spec: ProcessExecutionEnvSpec) -> Self {
367        self.env_spec = Some(env_spec);
368        self
369    }
370
371    pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
372        self.wake_target = wake_target;
373        self
374    }
375
376    pub fn with_grant(mut self, grant: Option<ProcessStartGrant>) -> Self {
377        self.grant = grant;
378        self
379    }
380
381    pub fn with_event_types(
382        mut self,
383        event_types: impl IntoIterator<Item = ProcessEventType>,
384    ) -> Self {
385        self.event_types = event_types.into_iter().collect();
386        self
387    }
388
389    pub fn with_extra_event_types(
390        mut self,
391        event_types: impl IntoIterator<Item = ProcessEventType>,
392    ) -> Self {
393        self.event_types.extend(event_types);
394        self
395    }
396
397    pub fn into_registration(self, env_ref: Option<ProcessExecutionEnvRef>) -> ProcessRegistration {
398        ProcessRegistration::new(self.id, self.input, ProcessProvenance::new(self.originator))
399            .with_event_types(self.event_types)
400            .with_execution_env_ref(env_ref)
401            .with_wake_target(self.wake_target)
402    }
403}
404
405#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
406pub struct SessionScope {
407    pub session_id: String,
408    #[serde(default, skip_serializing_if = "Option::is_none")]
409    pub agent_frame_id: Option<crate::AgentFrameId>,
410}
411
412#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
413pub struct ProcessProvenance {
414    pub originator: ProcessOriginator,
415    #[serde(default, skip_serializing_if = "Option::is_none")]
416    pub caused_by: Option<crate::CausalRef>,
417}
418
419impl ProcessProvenance {
420    pub fn new(originator: ProcessOriginator) -> Self {
421        Self {
422            originator,
423            caused_by: None,
424        }
425    }
426
427    pub fn host() -> Self {
428        Self::new(ProcessOriginator::host())
429    }
430
431    pub fn session(scope: SessionScope) -> Self {
432        Self::new(ProcessOriginator::session(scope))
433    }
434
435    pub fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
436        self.caused_by = caused_by;
437        self
438    }
439}
440
441#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
442#[serde(tag = "type", rename_all = "snake_case")]
443pub enum ProcessOriginator {
444    Host,
445    Session { scope: SessionScope },
446}
447
448impl ProcessOriginator {
449    pub fn host() -> Self {
450        Self::Host
451    }
452
453    pub fn session(scope: SessionScope) -> Self {
454        Self::Session { scope }
455    }
456
457    pub fn scope_id(&self) -> String {
458        match self {
459            Self::Host => "host".to_string(),
460            Self::Session { scope } => scope.id().to_string(),
461        }
462    }
463}
464
465impl SessionScope {
466    pub fn new(session_id: impl Into<String>) -> Self {
467        Self {
468            session_id: session_id.into(),
469            agent_frame_id: None,
470        }
471    }
472
473    pub fn for_agent_frame(
474        session_id: impl Into<String>,
475        agent_frame_id: impl Into<crate::AgentFrameId>,
476    ) -> Self {
477        Self {
478            session_id: session_id.into(),
479            agent_frame_id: Some(agent_frame_id.into()),
480        }
481    }
482
483    pub fn id(&self) -> SessionScopeId {
484        match self.agent_frame_id.as_deref() {
485            Some(frame_id) if !frame_id.is_empty() => {
486                SessionScopeId::new(format!("session:{}/frame:{frame_id}", self.session_id))
487            }
488            _ => SessionScopeId::new(format!("session:{}", self.session_id)),
489        }
490    }
491
492    pub fn is_empty(&self) -> bool {
493        self.session_id.is_empty()
494    }
495}
496
497/// Serializable process spec used to start or recover a runtime process.
498#[derive(Debug, Serialize, Deserialize)]
499pub struct ProcessRegistration {
500    pub id: ProcessId,
501    pub input: Arc<ProcessInput>,
502    #[serde(default)]
503    pub event_types: Vec<ProcessEventType>,
504    pub provenance: ProcessProvenance,
505    #[serde(default, skip_serializing_if = "Option::is_none")]
506    pub env_ref: Option<ProcessExecutionEnvRef>,
507    #[serde(default, skip_serializing_if = "Option::is_none")]
508    pub wake_target: Option<SessionScope>,
509}
510
511impl Clone for ProcessRegistration {
512    fn clone(&self) -> Self {
513        Self {
514            id: self.id.clone(),
515            input: Arc::clone(&self.input),
516            event_types: self.event_types.clone(),
517            provenance: self.provenance.clone(),
518            env_ref: self.env_ref.clone(),
519            wake_target: self.wake_target.clone(),
520        }
521    }
522}
523
524impl ProcessRegistration {
525    pub fn new(
526        id: impl Into<ProcessId>,
527        input: ProcessInput,
528        provenance: ProcessProvenance,
529    ) -> Self {
530        Self {
531            id: id.into(),
532            input: Arc::new(input),
533            event_types: default_process_event_types(),
534            provenance,
535            env_ref: None,
536            wake_target: None,
537        }
538    }
539
540    pub(crate) fn session_start_draft(id: impl Into<ProcessId>, input: ProcessInput) -> Self {
541        Self::new(id, input, ProcessProvenance::host())
542    }
543
544    pub fn with_process_provenance(mut self, provenance: ProcessProvenance) -> Self {
545        self.provenance = provenance;
546        self
547    }
548
549    pub fn with_execution_env_ref(mut self, env_ref: Option<ProcessExecutionEnvRef>) -> Self {
550        self.env_ref = env_ref;
551        self
552    }
553
554    pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
555        self.wake_target = wake_target;
556        self
557    }
558
559    pub fn with_event_types(
560        mut self,
561        event_types: impl IntoIterator<Item = ProcessEventType>,
562    ) -> Self {
563        self.event_types = event_types.into_iter().collect();
564        self
565    }
566
567    pub fn with_extra_event_types(
568        mut self,
569        event_types: impl IntoIterator<Item = ProcessEventType>,
570    ) -> Self {
571        self.event_types.extend(event_types);
572        self
573    }
574}
575
576#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
577#[serde(tag = "state", rename_all = "snake_case")]
578pub enum ProcessStatus {
579    #[default]
580    Running,
581    Completed {
582        await_output: ProcessAwaitOutput,
583    },
584    Failed {
585        await_output: ProcessAwaitOutput,
586    },
587    Cancelled {
588        await_output: ProcessAwaitOutput,
589    },
590}
591
592impl ProcessStatus {
593    pub fn from_terminal(terminal: ProcessTerminalSemantics) -> Self {
594        match terminal.state {
595            ProcessTerminalState::Completed => Self::Completed {
596                await_output: terminal.await_output,
597            },
598            ProcessTerminalState::Failed => Self::Failed {
599                await_output: terminal.await_output,
600            },
601            ProcessTerminalState::Cancelled => Self::Cancelled {
602                await_output: terminal.await_output,
603            },
604        }
605    }
606
607    pub fn is_terminal(&self) -> bool {
608        !matches!(self, Self::Running)
609    }
610
611    pub fn label(&self) -> &'static str {
612        match self {
613            Self::Running => "running",
614            Self::Completed { .. } => "completed",
615            Self::Failed { .. } => "failed",
616            Self::Cancelled { .. } => "cancelled",
617        }
618    }
619
620    pub fn terminal_state(&self) -> Option<ProcessTerminalState> {
621        match self {
622            Self::Running => None,
623            Self::Completed { .. } => Some(ProcessTerminalState::Completed),
624            Self::Failed { .. } => Some(ProcessTerminalState::Failed),
625            Self::Cancelled { .. } => Some(ProcessTerminalState::Cancelled),
626        }
627    }
628
629    pub fn await_output(&self) -> Option<&ProcessAwaitOutput> {
630        match self {
631            Self::Running => None,
632            Self::Completed { await_output }
633            | Self::Failed { await_output }
634            | Self::Cancelled { await_output } => Some(await_output),
635        }
636    }
637
638    pub fn terminal_semantics(&self) -> Option<ProcessTerminalSemantics> {
639        Some(ProcessTerminalSemantics {
640            state: self.terminal_state()?,
641            await_output: self.await_output()?.clone(),
642        })
643    }
644}
645
646/// Durable process row. Session-visible addressability lives in
647/// [`ProcessHandleGrant`], not in the process record.
648#[derive(Clone, Debug, Serialize, Deserialize)]
649pub struct ProcessRecord {
650    pub id: ProcessId,
651    pub registration_hash: String,
652    pub input: Arc<ProcessInput>,
653    #[serde(default)]
654    pub event_types: Vec<ProcessEventType>,
655    pub provenance: ProcessProvenance,
656    #[serde(default, skip_serializing_if = "Option::is_none")]
657    pub env_ref: Option<ProcessExecutionEnvRef>,
658    #[serde(default, skip_serializing_if = "Option::is_none")]
659    pub wake_target: Option<SessionScope>,
660    #[serde(default)]
661    pub created_at_ms: u64,
662    #[serde(default)]
663    pub updated_at_ms: u64,
664    #[serde(default, skip_serializing_if = "Option::is_none")]
665    pub external_ref: Option<ProcessExternalRef>,
666    #[serde(default, skip_serializing_if = "Option::is_none")]
667    pub wait: Option<WaitState>,
668    #[serde(default)]
669    pub status: ProcessStatus,
670}
671
672#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
673pub struct WaitState {
674    pub kind: WaitKind,
675    pub since_ms: u64,
676}
677
678#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
679#[serde(tag = "kind", rename_all = "snake_case")]
680pub enum WaitKind {
681    Signal {
682        name: String,
683        event_type: String,
684        key: String,
685        ordinal: u64,
686    },
687}
688
689impl ProcessRecord {
690    pub fn from_registration(mut registration: ProcessRegistration) -> Self {
691        ensure_core_event_types(&mut registration);
692        validate_process_registration(&registration)
693            .expect("process registration should be valid before record construction");
694        let registration_hash = process_registration_hash(&registration)
695            .expect("process registration should hash before record construction");
696        Self::from_prepared_registration(registration, registration_hash, current_epoch_ms())
697    }
698
699    pub fn from_prepared_registration(
700        registration: ProcessRegistration,
701        registration_hash: String,
702        now_ms: u64,
703    ) -> Self {
704        Self {
705            id: registration.id,
706            registration_hash,
707            input: registration.input,
708            event_types: registration.event_types,
709            provenance: registration.provenance,
710            env_ref: registration.env_ref,
711            wake_target: registration.wake_target,
712            created_at_ms: now_ms,
713            updated_at_ms: now_ms,
714            external_ref: None,
715            wait: None,
716            status: ProcessStatus::Running,
717        }
718    }
719
720    pub fn is_terminal(&self) -> bool {
721        self.status.is_terminal()
722    }
723
724    pub fn originator_scope_id(&self) -> String {
725        self.provenance.originator.scope_id()
726    }
727}
728
729/// Wire-format version stamped on every persisted [`ProcessLease`].
730///
731/// Bump when the on-wire shape of `ProcessLease` changes in a way that older
732/// code cannot safely deserialize.
733pub const PROCESS_LEASE_SCHEMA_VERSION: u32 = 1;
734
735/// Durable lease over a non-terminal background process.
736///
737/// The lease pair `(owner_id, lease_token)` plus `fencing_token` are how lash guarantees that
738/// one non-terminal process is re-executed by exactly one worker at a time —
739/// even after a crash, even across two workers that both sweep the same
740/// registry for recoverable work. The durable backend
741/// (`lash-sqlite-store`) uses these to serialize concurrent claims on the same
742/// `process_id`; future distributed durable backends use the *same* fields to
743/// coordinate workers that don't share a file system.
744///
745/// **This is not single-process theatre.** The owner / fencing-token /
746/// lease-token triple is the public contract that lets any backend detect and
747/// reject stale writers. Treat it as load-bearing, not defensive.
748#[derive(Clone, Debug, Serialize, Deserialize)]
749pub struct ProcessLease {
750    pub schema_version: u32,
751    pub process_id: ProcessId,
752    pub owner_id: String,
753    pub lease_token: String,
754    pub fencing_token: u64,
755    pub claimed_at_epoch_ms: u64,
756    pub expires_at_epoch_ms: u64,
757}
758
759#[derive(Clone, Debug, Serialize, Deserialize)]
760pub struct ProcessLeaseCompletion {
761    pub process_id: ProcessId,
762    pub lease_token: String,
763}
764
765impl ProcessLeaseCompletion {
766    pub fn from_lease(lease: &ProcessLease) -> Self {
767        Self {
768            process_id: lease.process_id.clone(),
769            lease_token: lease.lease_token.clone(),
770        }
771    }
772}
773
774/// Durable backend reference for background work accepted outside the local process.
775#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
776pub struct ProcessExternalRef {
777    pub backend: String,
778    pub id: String,
779    #[serde(default, skip_serializing_if = "Option::is_none")]
780    pub metadata: Option<serde_json::Value>,
781}
782
783#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
784pub struct ProcessHandleDescriptor {
785    #[serde(default, skip_serializing_if = "Option::is_none")]
786    pub kind: Option<String>,
787    #[serde(default, skip_serializing_if = "Option::is_none")]
788    pub label: Option<String>,
789}
790
791impl ProcessHandleDescriptor {
792    pub fn new(kind: Option<impl Into<String>>, label: Option<impl Into<String>>) -> Self {
793        Self {
794            kind: kind.map(Into::into),
795            label: label.map(Into::into),
796        }
797    }
798}
799
800#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
801pub struct ProcessHandleGrant {
802    pub session_id: String,
803    pub process_id: ProcessId,
804    pub descriptor: ProcessHandleDescriptor,
805}
806
807pub type ProcessHandleGrantEntry = (ProcessHandleGrant, ProcessRecord);
808
809#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
810#[serde(rename_all = "snake_case")]
811pub enum ProcessLifecycleStatus {
812    #[default]
813    Running,
814    Completed,
815    Failed,
816    Cancelled,
817}
818
819impl ProcessLifecycleStatus {
820    pub fn label(self) -> &'static str {
821        match self {
822            Self::Running => "running",
823            Self::Completed => "completed",
824            Self::Failed => "failed",
825            Self::Cancelled => "cancelled",
826        }
827    }
828
829    pub fn is_terminal(self) -> bool {
830        !matches!(self, Self::Running)
831    }
832
833    pub fn terminal_state(self) -> Option<ProcessTerminalState> {
834        match self {
835            Self::Running => None,
836            Self::Completed => Some(ProcessTerminalState::Completed),
837            Self::Failed => Some(ProcessTerminalState::Failed),
838            Self::Cancelled => Some(ProcessTerminalState::Cancelled),
839        }
840    }
841}
842
843impl From<&ProcessStatus> for ProcessLifecycleStatus {
844    fn from(status: &ProcessStatus) -> Self {
845        match status {
846            ProcessStatus::Running => Self::Running,
847            ProcessStatus::Completed { .. } => Self::Completed,
848            ProcessStatus::Failed { .. } => Self::Failed,
849            ProcessStatus::Cancelled { .. } => Self::Cancelled,
850        }
851    }
852}
853
854impl From<ProcessStatus> for ProcessLifecycleStatus {
855    fn from(status: ProcessStatus) -> Self {
856        Self::from(&status)
857    }
858}
859
860#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
861pub struct ProcessHandleSummary {
862    #[serde(rename = "__handle__")]
863    pub handle_type: String,
864    pub id: ProcessId,
865    pub process_id: ProcessId,
866    pub descriptor: ProcessHandleDescriptor,
867    #[serde(default, skip_serializing_if = "Option::is_none")]
868    pub definition: Option<ProcessDefinitionSummary>,
869    pub status: ProcessLifecycleStatus,
870}
871
872impl ProcessHandleSummary {
873    pub fn new(
874        process_id: impl Into<ProcessId>,
875        descriptor: ProcessHandleDescriptor,
876        status: ProcessLifecycleStatus,
877    ) -> Self {
878        let process_id = process_id.into();
879        Self {
880            handle_type: "process".to_string(),
881            id: process_id.clone(),
882            process_id,
883            descriptor,
884            definition: None,
885            status,
886        }
887    }
888
889    pub fn with_definition(mut self, definition: Option<ProcessDefinitionSummary>) -> Self {
890        self.definition = definition;
891        self
892    }
893
894    pub fn from_grant_record(grant: ProcessHandleGrant, record: ProcessRecord) -> Self {
895        let definition = ProcessDefinitionSummary::from_input(record.input.as_ref());
896        Self::new(
897            record.id,
898            grant.descriptor,
899            ProcessLifecycleStatus::from(record.status),
900        )
901        .with_definition(definition)
902    }
903}
904
905impl From<ProcessHandleGrantEntry> for ProcessHandleSummary {
906    fn from((grant, record): ProcessHandleGrantEntry) -> Self {
907        Self::from_grant_record(grant, record)
908    }
909}
910
911#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
912pub struct ProcessCancelSummary {
913    pub process_id: ProcessId,
914    pub status: ProcessLifecycleStatus,
915}
916
917impl ProcessCancelSummary {
918    pub fn from_record(record: ProcessRecord) -> Self {
919        Self {
920            process_id: record.id,
921            status: ProcessLifecycleStatus::from(record.status),
922        }
923    }
924}
925
926#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
927pub struct ProcessDefinitionSummary {
928    pub name: String,
929}
930
931impl ProcessDefinitionSummary {
932    pub fn from_input(input: &ProcessInput) -> Option<Self> {
933        match input {
934            ProcessInput::LashlangProcess { process_name, .. } => Some(Self {
935                name: process_name.clone(),
936            }),
937            ProcessInput::ToolCall { .. }
938            | ProcessInput::SessionTurn { .. }
939            | ProcessInput::External { .. } => None,
940        }
941    }
942}
943
944#[derive(Clone, Debug, PartialEq, Eq)]
945pub struct ProcessDefinitionSelector {
946    module_ref: lashlang::ModuleRef,
947    host_requirements_ref: lashlang::HostRequirementsRef,
948    process_ref: lashlang::ProcessRef,
949    process_name: String,
950}
951
952impl ProcessDefinitionSelector {
953    pub fn new(
954        module_ref: lashlang::ModuleRef,
955        host_requirements_ref: lashlang::HostRequirementsRef,
956        process_ref: lashlang::ProcessRef,
957        process_name: impl Into<String>,
958    ) -> Self {
959        Self {
960            module_ref,
961            host_requirements_ref,
962            process_ref,
963            process_name: process_name.into(),
964        }
965    }
966
967    pub fn module_ref(&self) -> &lashlang::ModuleRef {
968        &self.module_ref
969    }
970
971    pub fn host_requirements_ref(&self) -> &lashlang::HostRequirementsRef {
972        &self.host_requirements_ref
973    }
974
975    pub fn process_ref(&self) -> &lashlang::ProcessRef {
976        &self.process_ref
977    }
978
979    pub fn process_name(&self) -> &str {
980        &self.process_name
981    }
982
983    pub fn decode(value: &serde_json::Value) -> Result<Self, String> {
984        if value
985            .get(lashlang::LASH_PROCESS_VALUE_KEY)
986            .and_then(serde_json::Value::as_bool)
987            != Some(true)
988        {
989            return Err("definition must be a process definition value".to_string());
990        }
991        Ok(Self {
992            module_ref: decode_process_definition_field(
993                value,
994                lashlang::LASH_MODULE_REF_KEY,
995                "definition",
996            )?,
997            host_requirements_ref: decode_process_definition_field(
998                value,
999                lashlang::LASH_HOST_REQUIREMENTS_REF_KEY,
1000                "definition",
1001            )?,
1002            process_ref: decode_process_definition_field(
1003                value,
1004                lashlang::LASH_PROCESS_REF_KEY,
1005                "definition",
1006            )?,
1007            process_name: value
1008                .get(lashlang::LASH_PROCESS_NAME_KEY)
1009                .and_then(serde_json::Value::as_str)
1010                .ok_or_else(|| "definition is missing its process name".to_string())?
1011                .to_string(),
1012        })
1013    }
1014
1015    pub fn matches_input(&self, input: &ProcessInput) -> bool {
1016        match input {
1017            ProcessInput::LashlangProcess {
1018                module_ref,
1019                process_ref,
1020                host_requirements_ref,
1021                process_name,
1022                ..
1023            } => {
1024                self.module_ref == *module_ref
1025                    && self.host_requirements_ref == *host_requirements_ref
1026                    && self.process_ref == *process_ref
1027                    && self.process_name == *process_name
1028            }
1029            ProcessInput::ToolCall { .. }
1030            | ProcessInput::SessionTurn { .. }
1031            | ProcessInput::External { .. } => false,
1032        }
1033    }
1034}
1035
1036fn decode_process_definition_field<T: serde::de::DeserializeOwned>(
1037    value: &serde_json::Value,
1038    field: &'static str,
1039    label: &'static str,
1040) -> Result<T, String> {
1041    serde_json::from_value(
1042        value
1043            .get(field)
1044            .cloned()
1045            .ok_or_else(|| format!("{label} is missing {field}"))?,
1046    )
1047    .map_err(|err| format!("{label} has invalid {field}: {err}"))
1048}
1049
1050#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
1051pub enum ProcessStatusFilter {
1052    #[default]
1053    Running,
1054    Completed,
1055    Failed,
1056    Cancelled,
1057    Any,
1058}
1059
1060impl ProcessStatusFilter {
1061    pub fn decode(value: Option<&str>) -> Result<Self, String> {
1062        match value.unwrap_or("running") {
1063            "running" => Ok(Self::Running),
1064            "completed" => Ok(Self::Completed),
1065            "failed" => Ok(Self::Failed),
1066            "cancelled" => Ok(Self::Cancelled),
1067            "any" => Ok(Self::Any),
1068            other => Err(format!(
1069                "processes.list status must be `running`, `completed`, `failed`, `cancelled`, or `any`, got `{other}`"
1070            )),
1071        }
1072    }
1073
1074    pub fn list_mode(self) -> ProcessListMode {
1075        match self {
1076            Self::Running => ProcessListMode::Live,
1077            Self::Completed | Self::Failed | Self::Cancelled | Self::Any => ProcessListMode::All,
1078        }
1079    }
1080
1081    pub fn matches(self, status: ProcessLifecycleStatus) -> bool {
1082        match self {
1083            Self::Running => status == ProcessLifecycleStatus::Running,
1084            Self::Completed => status == ProcessLifecycleStatus::Completed,
1085            Self::Failed => status == ProcessLifecycleStatus::Failed,
1086            Self::Cancelled => status == ProcessLifecycleStatus::Cancelled,
1087            Self::Any => true,
1088        }
1089    }
1090}
1091
1092#[derive(Clone, Debug, Default, PartialEq, Eq)]
1093pub struct ProcessListFilter {
1094    pub definition: Option<ProcessDefinitionSelector>,
1095    pub status: ProcessStatusFilter,
1096    pub waiting: Option<bool>,
1097}
1098
1099impl ProcessListFilter {
1100    pub fn decode(args: &serde_json::Value) -> Result<Self, String> {
1101        let map = args
1102            .as_object()
1103            .ok_or_else(|| "processes.list expects a record of process filters".to_string())?;
1104        for key in map.keys() {
1105            match key.as_str() {
1106                "definition" | "status" | "waiting" => {}
1107                _ => return Err(format!("processes.list unknown filter `{key}`")),
1108            }
1109        }
1110        let definition = args
1111            .get("definition")
1112            .map(ProcessDefinitionSelector::decode)
1113            .transpose()?;
1114        let status =
1115            ProcessStatusFilter::decode(args.get("status").and_then(serde_json::Value::as_str))?;
1116        let waiting = args
1117            .get("waiting")
1118            .map(|value| {
1119                value
1120                    .as_bool()
1121                    .ok_or_else(|| "processes.list `waiting` filter must be a boolean".to_string())
1122            })
1123            .transpose()?;
1124        Ok(Self {
1125            definition,
1126            status,
1127            waiting,
1128        })
1129    }
1130
1131    pub fn list_mode(&self) -> ProcessListMode {
1132        self.status.list_mode()
1133    }
1134
1135    pub fn matches_entry(&self, entry: &ProcessHandleGrantEntry) -> bool {
1136        let (_grant, record) = entry;
1137        self.matches_record(record)
1138    }
1139
1140    pub fn matches_record(&self, record: &ProcessRecord) -> bool {
1141        let status = ProcessLifecycleStatus::from(&record.status);
1142        self.status.matches(status)
1143            && self
1144                .definition
1145                .as_ref()
1146                .is_none_or(|definition| definition.matches_input(record.input.as_ref()))
1147            && self
1148                .waiting
1149                .is_none_or(|waiting| record.wait.is_some() == waiting)
1150    }
1151}
1152
1153#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1154#[serde(rename_all = "snake_case")]
1155pub enum ProcessListMode {
1156    #[default]
1157    Live,
1158    All,
1159}
1160
1161impl ProcessListMode {
1162    pub fn as_str(self) -> &'static str {
1163        match self {
1164            Self::Live => "live",
1165            Self::All => "all",
1166        }
1167    }
1168}
1169
1170#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1171pub struct ProcessStartGrant {
1172    pub session_scope: SessionScope,
1173    pub descriptor: ProcessHandleDescriptor,
1174}
1175
1176#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1177pub struct ProcessSessionDeleteReport {
1178    pub session_id: String,
1179    pub revoked_handle_count: usize,
1180    pub deleted_wake_count: usize,
1181    pub orphaned_process_ids: Vec<String>,
1182    pub preserved_process_ids: Vec<String>,
1183}
1184
1185#[cfg(test)]
1186mod tests {
1187    use serde_json::json;
1188
1189    use super::*;
1190
1191    fn process_ref(component: &str, pos: usize) -> lashlang::ProcessRef {
1192        lashlang::ProcessRef {
1193            component: lashlang::ContentHash::new(component),
1194            pos: pos as u32,
1195        }
1196    }
1197
1198    fn process_value(
1199        module_ref: &lashlang::ModuleRef,
1200        host_requirements_ref: &lashlang::HostRequirementsRef,
1201        process_ref: &lashlang::ProcessRef,
1202        name: &str,
1203    ) -> serde_json::Value {
1204        let mut value = serde_json::Map::new();
1205        value.insert(lashlang::LASH_PROCESS_VALUE_KEY.to_string(), json!(true));
1206        value.insert(lashlang::LASH_MODULE_REF_KEY.to_string(), json!(module_ref));
1207        value.insert(
1208            lashlang::LASH_HOST_REQUIREMENTS_REF_KEY.to_string(),
1209            json!(host_requirements_ref),
1210        );
1211        value.insert(
1212            lashlang::LASH_PROCESS_REF_KEY.to_string(),
1213            json!(process_ref),
1214        );
1215        value.insert(lashlang::LASH_PROCESS_NAME_KEY.to_string(), json!(name));
1216        serde_json::Value::Object(value)
1217    }
1218
1219    fn lashlang_entry(
1220        process_id: &str,
1221        module_ref: lashlang::ModuleRef,
1222        host_requirements_ref: lashlang::HostRequirementsRef,
1223        process_ref: lashlang::ProcessRef,
1224        process_name: &str,
1225        status: ProcessStatus,
1226    ) -> ProcessHandleGrantEntry {
1227        let mut record = ProcessRecord::from_registration(
1228            ProcessRegistration::new(
1229                process_id,
1230                ProcessInput::LashlangProcess {
1231                    module_ref,
1232                    process_ref,
1233                    host_requirements_ref: host_requirements_ref,
1234                    process_name: process_name.to_string(),
1235                    args: serde_json::Map::new(),
1236                },
1237                ProcessProvenance::host(),
1238            )
1239            .with_execution_env_ref(Some(ProcessExecutionEnvRef::new(format!(
1240                "process-env:test:{process_id}"
1241            )))),
1242        );
1243        record.status = status;
1244        (
1245            ProcessHandleGrant {
1246                session_id: "session".to_string(),
1247                process_id: process_id.to_string(),
1248                descriptor: ProcessHandleDescriptor::new(Some("lashlang"), Some(process_name)),
1249            },
1250            record,
1251        )
1252    }
1253
1254    #[test]
1255    fn process_list_filter_matches_definition_and_status() {
1256        let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
1257        let host_requirements_ref =
1258            lashlang::HostRequirementsRef::new(&lashlang::ContentHash::new("surface"));
1259        let target_ref = process_ref("target", 0);
1260        let other_ref = process_ref("other", 1);
1261        let filter = ProcessListFilter::decode(&json!({
1262            "definition": process_value(&module_ref, &host_requirements_ref, &target_ref, "target"),
1263            "status": "completed"
1264        }))
1265        .expect("decode filter");
1266
1267        let matching = lashlang_entry(
1268            "matching",
1269            module_ref.clone(),
1270            host_requirements_ref.clone(),
1271            target_ref,
1272            "target",
1273            ProcessStatus::Completed {
1274                await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1275                    json!(true),
1276                )),
1277            },
1278        );
1279        let wrong_definition = lashlang_entry(
1280            "wrong-definition",
1281            module_ref,
1282            host_requirements_ref,
1283            other_ref,
1284            "other",
1285            ProcessStatus::Completed {
1286                await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1287                    json!(true),
1288                )),
1289            },
1290        );
1291
1292        assert_eq!(filter.list_mode(), ProcessListMode::All);
1293        assert!(filter.matches_entry(&matching));
1294        assert!(!filter.matches_entry(&wrong_definition));
1295    }
1296
1297    #[test]
1298    fn process_list_filter_matches_waiting_facet() {
1299        let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
1300        let host_requirements_ref =
1301            lashlang::HostRequirementsRef::new(&lashlang::ContentHash::new("surface"));
1302        let process_ref = process_ref("target", 0);
1303        let mut waiting_entry = lashlang_entry(
1304            "waiting",
1305            module_ref.clone(),
1306            host_requirements_ref.clone(),
1307            process_ref.clone(),
1308            "target",
1309            ProcessStatus::Running,
1310        );
1311        waiting_entry.1.wait = Some(WaitState {
1312            since_ms: 42,
1313            kind: WaitKind::Signal {
1314                name: "ready".to_string(),
1315                event_type: "signal.ready".to_string(),
1316                key: "process:waiting:signal.ready:1".to_string(),
1317                ordinal: 1,
1318            },
1319        });
1320        let idle_entry = lashlang_entry(
1321            "idle",
1322            module_ref,
1323            host_requirements_ref,
1324            process_ref,
1325            "target",
1326            ProcessStatus::Running,
1327        );
1328        let waiting_filter =
1329            ProcessListFilter::decode(&json!({ "waiting": true })).expect("decode waiting filter");
1330        let idle_filter =
1331            ProcessListFilter::decode(&json!({ "waiting": false })).expect("decode idle filter");
1332
1333        assert_eq!(waiting_filter.list_mode(), ProcessListMode::Live);
1334        assert!(waiting_filter.matches_entry(&waiting_entry));
1335        assert!(!waiting_filter.matches_entry(&idle_entry));
1336        assert!(!idle_filter.matches_entry(&waiting_entry));
1337        assert!(idle_filter.matches_entry(&idle_entry));
1338        assert!(
1339            ProcessListFilter::decode(&json!({ "waiting": "yes" }))
1340                .expect_err("invalid waiting filter")
1341                .contains("must be a boolean")
1342        );
1343    }
1344}