Skip to main content

lash_core/runtime/process/
model.rs

1use std::collections::BTreeMap;
2use std::fmt;
3use std::sync::Arc;
4use std::sync::Mutex;
5
6use serde::{Deserialize, Serialize};
7
8use super::events::{
9    ProcessAwaitOutput, ProcessEventType, ProcessTerminalSemantics, ProcessTerminalState,
10    default_process_event_types,
11};
12use super::validation::{
13    ensure_core_event_types, process_registration_hash, validate_process_registration,
14};
15
16pub type ProcessId = String;
17
18#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
19#[serde(transparent)]
20pub struct SessionScopeId(String);
21
22impl SessionScopeId {
23    pub fn new(value: impl Into<String>) -> Self {
24        Self(value.into())
25    }
26
27    pub fn as_str(&self) -> &str {
28        &self.0
29    }
30}
31
32impl fmt::Display for SessionScopeId {
33    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
34        formatter.write_str(&self.0)
35    }
36}
37
38impl From<String> for SessionScopeId {
39    fn from(value: String) -> Self {
40        Self::new(value)
41    }
42}
43
44impl From<&str> for SessionScopeId {
45    fn from(value: &str) -> Self {
46        Self::new(value)
47    }
48}
49
50/// Durable executable input for a process.
51///
52/// `ToolCall`, `SessionTurn`, and `External` are kernel process primitives:
53/// core owns their durable representation and execution semantics because they
54/// are how the runtime coordinates tools, child sessions, and externally
55/// completed work. `Engine` is the extension point for deployment-specific
56/// process runtimes; those rows require a matching [`crate::ProcessEngine`] in
57/// the host's process engine registry.
58#[derive(Debug, Serialize, Deserialize)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum ProcessInput {
61    ToolCall {
62        call: crate::PreparedToolCall,
63    },
64    Engine {
65        kind: String,
66        #[serde(default)]
67        payload: serde_json::Value,
68    },
69    SessionTurn {
70        create_request: Box<crate::SessionCreateRequest>,
71        turn_input: Box<crate::TurnInput>,
72        output_contract: crate::ToolOutputContract,
73    },
74    External {
75        #[serde(default)]
76        metadata: serde_json::Value,
77    },
78}
79
80impl Clone for ProcessInput {
81    fn clone(&self) -> Self {
82        match self {
83            Self::ToolCall { call } => Self::ToolCall { call: call.clone() },
84            Self::Engine { kind, payload } => Self::Engine {
85                kind: kind.clone(),
86                payload: payload.clone(),
87            },
88            Self::SessionTurn {
89                create_request,
90                turn_input,
91                output_contract,
92            } => Self::SessionTurn {
93                create_request: create_request.clone(),
94                turn_input: turn_input.clone(),
95                output_contract: output_contract.clone(),
96            },
97            Self::External { metadata } => Self::External {
98                metadata: metadata.clone(),
99            },
100        }
101    }
102}
103
104impl PartialEq for ProcessInput {
105    fn eq(&self, other: &Self) -> bool {
106        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
107    }
108}
109
110impl ProcessInput {
111    pub fn engine_kind(&self) -> &'static str {
112        match self {
113            Self::ToolCall { .. } => "tool",
114            Self::Engine { .. } => "engine",
115            Self::SessionTurn { .. } => "session_turn",
116            Self::External { .. } => "external",
117        }
118    }
119
120    pub fn engine_specific_kind(&self) -> Option<&str> {
121        match self {
122            Self::Engine { kind, .. } => Some(kind.as_str()),
123            _ => None,
124        }
125    }
126}
127
128#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
129#[serde(transparent)]
130pub struct ProcessExecutionEnvRef(String);
131
132impl ProcessExecutionEnvRef {
133    pub fn new(value: impl Into<String>) -> Self {
134        Self(value.into())
135    }
136
137    pub fn as_str(&self) -> &str {
138        &self.0
139    }
140}
141
142impl fmt::Display for ProcessExecutionEnvRef {
143    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
144        formatter.write_str(&self.0)
145    }
146}
147
148#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
149#[serde(deny_unknown_fields)]
150pub struct ProcessExecutionEnvSpec {
151    #[serde(default)]
152    pub plugin_options: crate::PluginOptions,
153    #[serde(default)]
154    pub policy: crate::SessionPolicy,
155}
156
157impl ProcessExecutionEnvSpec {
158    pub fn new(plugin_options: crate::PluginOptions, policy: crate::SessionPolicy) -> Self {
159        Self {
160            plugin_options,
161            policy,
162        }
163    }
164
165    pub fn stable_ref(&self) -> Result<ProcessExecutionEnvRef, serde_json::Error> {
166        crate::stable_hash::stable_json_sha256_hex(self)
167            .map(|hash| ProcessExecutionEnvRef::new(format!("process-env:sha256:{hash}")))
168    }
169
170    pub fn to_store_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
171        serde_json::to_vec(self)
172    }
173
174    pub fn from_store_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
175        serde_json::from_slice(bytes)
176    }
177}
178
179#[async_trait::async_trait]
180pub trait ProcessExecutionEnvStore: Send + Sync {
181    fn durability_tier(&self) -> crate::DurabilityTier {
182        crate::DurabilityTier::Inline
183    }
184
185    async fn put_process_execution_env(
186        &self,
187        env_ref: &ProcessExecutionEnvRef,
188        bytes: &[u8],
189    ) -> Result<(), crate::PluginError>;
190
191    async fn get_process_execution_env(
192        &self,
193        env_ref: &ProcessExecutionEnvRef,
194    ) -> Result<Option<Vec<u8>>, crate::PluginError>;
195}
196
197#[derive(Default)]
198pub struct InMemoryProcessExecutionEnvStore {
199    envs: Mutex<BTreeMap<String, Vec<u8>>>,
200}
201
202impl InMemoryProcessExecutionEnvStore {
203    pub fn new() -> Self {
204        Self::default()
205    }
206}
207
208#[async_trait::async_trait]
209impl ProcessExecutionEnvStore for InMemoryProcessExecutionEnvStore {
210    async fn put_process_execution_env(
211        &self,
212        env_ref: &ProcessExecutionEnvRef,
213        bytes: &[u8],
214    ) -> Result<(), crate::PluginError> {
215        self.envs
216            .lock()
217            .map_err(|_| {
218                crate::PluginError::Session("process execution env store lock poisoned".to_string())
219            })?
220            .insert(env_ref.as_str().to_string(), bytes.to_vec());
221        Ok(())
222    }
223
224    async fn get_process_execution_env(
225        &self,
226        env_ref: &ProcessExecutionEnvRef,
227    ) -> Result<Option<Vec<u8>>, crate::PluginError> {
228        Ok(self
229            .envs
230            .lock()
231            .map_err(|_| {
232                crate::PluginError::Session("process execution env store lock poisoned".to_string())
233            })?
234            .get(env_ref.as_str())
235            .cloned())
236    }
237}
238
239pub async fn persist_process_execution_env(
240    env_store: &dyn ProcessExecutionEnvStore,
241    spec: &ProcessExecutionEnvSpec,
242) -> Result<ProcessExecutionEnvRef, crate::PluginError> {
243    let env_ref = spec.stable_ref().map_err(|err| {
244        crate::PluginError::Session(format!("failed to hash process execution env: {err}"))
245    })?;
246    let bytes = spec.to_store_bytes().map_err(|err| {
247        crate::PluginError::Session(format!("failed to encode process execution env: {err}"))
248    })?;
249    env_store
250        .put_process_execution_env(&env_ref, &bytes)
251        .await?;
252    Ok(env_ref)
253}
254
255pub async fn load_process_execution_env(
256    env_store: &dyn ProcessExecutionEnvStore,
257    env_ref: &ProcessExecutionEnvRef,
258) -> Result<ProcessExecutionEnvSpec, crate::PluginError> {
259    let bytes = env_store
260        .get_process_execution_env(env_ref)
261        .await?
262        .ok_or_else(|| {
263            crate::PluginError::Session(format!("missing process execution env `{env_ref}`"))
264        })?;
265    ProcessExecutionEnvSpec::from_store_bytes(&bytes).map_err(|err| {
266        crate::PluginError::Session(format!(
267            "failed to decode process execution env `{env_ref}`: {err}"
268        ))
269    })
270}
271
272/// Execution-local context for process runners. Durable edges live on the
273/// process record.
274#[derive(Clone, Debug, Default, Serialize, Deserialize)]
275pub struct ProcessExecutionContext {
276    #[serde(default, skip_serializing_if = "Option::is_none")]
277    pub causal_invocation: Option<crate::RuntimeInvocation>,
278}
279
280impl ProcessExecutionContext {
281    pub fn with_causal_invocation(mut self, invocation: Option<crate::RuntimeInvocation>) -> Self {
282        self.causal_invocation = invocation;
283        self
284    }
285
286    pub fn is_empty(&self) -> bool {
287        self.causal_invocation.is_none()
288    }
289}
290
291#[derive(Clone)]
292pub struct ProcessOpScope<'scope> {
293    pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
294    pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'scope>,
295    pub(crate) agent_frame_id: Option<crate::AgentFrameId>,
296    pub(crate) target_agent_frame_id: Option<crate::AgentFrameId>,
297}
298
299impl<'scope> ProcessOpScope<'scope> {
300    pub fn new(scoped_effect_controller: crate::ScopedEffectController<'scope>) -> Self {
301        Self {
302            parent_invocation: None,
303            effect_controller: crate::runtime::RuntimeEffectControllerHandle::borrowed(
304                scoped_effect_controller,
305            ),
306            agent_frame_id: None,
307            target_agent_frame_id: None,
308        }
309    }
310
311    pub fn with_parent_invocation(
312        mut self,
313        parent_invocation: Option<crate::RuntimeInvocation>,
314    ) -> Self {
315        self.parent_invocation = parent_invocation;
316        self
317    }
318
319    pub fn with_agent_frame_id(mut self, agent_frame_id: Option<crate::AgentFrameId>) -> Self {
320        self.agent_frame_id = agent_frame_id;
321        self
322    }
323
324    pub fn with_target_agent_frame_id(
325        mut self,
326        agent_frame_id: Option<crate::AgentFrameId>,
327    ) -> Self {
328        self.target_agent_frame_id = agent_frame_id;
329        self
330    }
331
332    pub fn agent_frame_id(&self) -> Option<&str> {
333        self.agent_frame_id.as_deref()
334    }
335
336    pub fn target_agent_frame_id(&self) -> Option<&str> {
337        self.target_agent_frame_id.as_deref()
338    }
339
340    pub(crate) fn controller(&self) -> &dyn crate::RuntimeEffectController {
341        self.effect_controller.controller()
342    }
343}
344
345#[derive(Clone, Debug, Default)]
346pub struct ProcessStartOptions {
347    pub descriptor: Option<ProcessHandleDescriptor>,
348    /// Runtime-internal spawn provenance override. Set by process execution
349    /// contexts so children started *by a process* inherit the parent's
350    /// originator and wake target instead of being stamped with the ephemeral
351    /// execution scope. `None` means the session start path stamps the
352    /// creating session (the in-session meaning of "start"). This rides
353    /// options — not the request — so in-session callers cannot forge
354    /// provenance through the session surface.
355    pub spawn_provenance: Option<ProcessSpawnProvenance>,
356}
357
358/// Provenance a process-run context hands to its children: the chain's
359/// originator and the chain's wake target. Mirrors the trigger fire path,
360/// where the spawned process inherits the registrant and the grant is derived
361/// from the wake target.
362#[derive(Clone, Debug, PartialEq, Eq)]
363pub struct ProcessSpawnProvenance {
364    pub originator: ProcessOriginator,
365    pub wake_target: Option<SessionScope>,
366}
367
368impl ProcessStartOptions {
369    pub fn new() -> Self {
370        Self::default()
371    }
372
373    pub fn with_descriptor(mut self, descriptor: ProcessHandleDescriptor) -> Self {
374        self.descriptor = Some(descriptor);
375        self
376    }
377
378    pub fn with_optional_descriptor(mut self, descriptor: Option<ProcessHandleDescriptor>) -> Self {
379        self.descriptor = descriptor;
380        self
381    }
382
383    pub fn with_spawn_provenance(mut self, spawn_provenance: ProcessSpawnProvenance) -> Self {
384        self.spawn_provenance = Some(spawn_provenance);
385        self
386    }
387
388    pub fn execution_context(&self, scope: &ProcessOpScope<'_>) -> ProcessExecutionContext {
389        ProcessExecutionContext {
390            causal_invocation: scope.parent_invocation.clone(),
391        }
392    }
393}
394
395/// Public host-facing request for starting a visible process handle.
396#[derive(Clone, Debug, Serialize, Deserialize)]
397pub struct ProcessStartRequest {
398    pub id: ProcessId,
399    pub input: ProcessInput,
400    #[serde(default, skip_serializing_if = "Option::is_none")]
401    pub env_spec: Option<ProcessExecutionEnvSpec>,
402    pub originator: ProcessOriginator,
403    #[serde(default, skip_serializing_if = "Option::is_none")]
404    pub wake_target: Option<SessionScope>,
405    #[serde(default, skip_serializing_if = "Option::is_none")]
406    pub grant: Option<ProcessStartGrant>,
407    #[serde(default)]
408    pub event_types: Vec<ProcessEventType>,
409}
410
411impl ProcessStartRequest {
412    pub fn new(
413        id: impl Into<ProcessId>,
414        input: ProcessInput,
415        originator: ProcessOriginator,
416    ) -> Self {
417        Self {
418            id: id.into(),
419            input,
420            env_spec: None,
421            originator,
422            wake_target: None,
423            grant: None,
424            event_types: default_process_event_types(),
425        }
426    }
427
428    pub fn external(
429        id: impl Into<ProcessId>,
430        originator: ProcessOriginator,
431        metadata: serde_json::Value,
432    ) -> Self {
433        Self::new(id, ProcessInput::External { metadata }, originator)
434    }
435
436    pub fn with_env_spec(mut self, env_spec: ProcessExecutionEnvSpec) -> Self {
437        self.env_spec = Some(env_spec);
438        self
439    }
440
441    pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
442        self.wake_target = wake_target;
443        self
444    }
445
446    pub fn with_grant(mut self, grant: Option<ProcessStartGrant>) -> Self {
447        self.grant = grant;
448        self
449    }
450
451    pub fn with_event_types(
452        mut self,
453        event_types: impl IntoIterator<Item = ProcessEventType>,
454    ) -> Self {
455        self.event_types = event_types.into_iter().collect();
456        self
457    }
458
459    pub fn with_extra_event_types(
460        mut self,
461        event_types: impl IntoIterator<Item = ProcessEventType>,
462    ) -> Self {
463        self.event_types.extend(event_types);
464        self
465    }
466
467    pub fn into_registration(self, env_ref: Option<ProcessExecutionEnvRef>) -> ProcessRegistration {
468        ProcessRegistration::new(self.id, self.input, ProcessProvenance::new(self.originator))
469            .with_event_types(self.event_types)
470            .with_execution_env_ref(env_ref)
471            .with_wake_target(self.wake_target)
472    }
473}
474
475#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
476pub struct SessionScope {
477    pub session_id: String,
478    #[serde(default, skip_serializing_if = "Option::is_none")]
479    pub agent_frame_id: Option<crate::AgentFrameId>,
480}
481
482#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
483pub struct ProcessProvenance {
484    pub originator: ProcessOriginator,
485    #[serde(default, skip_serializing_if = "Option::is_none")]
486    pub caused_by: Option<crate::CausalRef>,
487}
488
489impl ProcessProvenance {
490    pub fn new(originator: ProcessOriginator) -> Self {
491        Self {
492            originator,
493            caused_by: None,
494        }
495    }
496
497    pub fn host() -> Self {
498        Self::new(ProcessOriginator::host())
499    }
500
501    pub fn session(scope: SessionScope) -> Self {
502        Self::new(ProcessOriginator::session(scope))
503    }
504
505    pub fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
506        self.caused_by = caused_by;
507        self
508    }
509}
510
511#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
512#[serde(tag = "type", rename_all = "snake_case")]
513pub enum ProcessOriginator {
514    Host,
515    Session { scope: SessionScope },
516}
517
518impl ProcessOriginator {
519    pub fn host() -> Self {
520        Self::Host
521    }
522
523    pub fn session(scope: SessionScope) -> Self {
524        Self::Session { scope }
525    }
526
527    pub fn scope_id(&self) -> String {
528        match self {
529            Self::Host => "host".to_string(),
530            Self::Session { scope } => scope.id().to_string(),
531        }
532    }
533}
534
535impl SessionScope {
536    pub fn new(session_id: impl Into<String>) -> Self {
537        Self {
538            session_id: session_id.into(),
539            agent_frame_id: None,
540        }
541    }
542
543    pub fn for_agent_frame(
544        session_id: impl Into<String>,
545        agent_frame_id: impl Into<crate::AgentFrameId>,
546    ) -> Self {
547        Self {
548            session_id: session_id.into(),
549            agent_frame_id: Some(agent_frame_id.into()),
550        }
551    }
552
553    pub fn id(&self) -> SessionScopeId {
554        match self.agent_frame_id.as_deref() {
555            Some(frame_id) if !frame_id.is_empty() => {
556                SessionScopeId::new(format!("session:{}/frame:{frame_id}", self.session_id))
557            }
558            _ => SessionScopeId::new(format!("session:{}", self.session_id)),
559        }
560    }
561
562    pub fn is_empty(&self) -> bool {
563        self.session_id.is_empty()
564    }
565}
566
567/// Serializable process spec used to start or recover a runtime process.
568#[derive(Debug, Serialize, Deserialize)]
569pub struct ProcessRegistration {
570    pub id: ProcessId,
571    pub input: Arc<ProcessInput>,
572    pub identity: ProcessIdentity,
573    #[serde(default)]
574    pub event_types: Vec<ProcessEventType>,
575    pub provenance: ProcessProvenance,
576    #[serde(default, skip_serializing_if = "Option::is_none")]
577    pub env_ref: Option<ProcessExecutionEnvRef>,
578    #[serde(default, skip_serializing_if = "Option::is_none")]
579    pub wake_target: Option<SessionScope>,
580}
581
582impl Clone for ProcessRegistration {
583    fn clone(&self) -> Self {
584        Self {
585            id: self.id.clone(),
586            input: Arc::clone(&self.input),
587            identity: self.identity.clone(),
588            event_types: self.event_types.clone(),
589            provenance: self.provenance.clone(),
590            env_ref: self.env_ref.clone(),
591            wake_target: self.wake_target.clone(),
592        }
593    }
594}
595
596impl ProcessRegistration {
597    pub fn new(
598        id: impl Into<ProcessId>,
599        input: ProcessInput,
600        provenance: ProcessProvenance,
601    ) -> Self {
602        let identity = ProcessIdentity::from_process_input(&input);
603        Self {
604            id: id.into(),
605            input: Arc::new(input),
606            identity,
607            event_types: default_process_event_types(),
608            provenance,
609            env_ref: None,
610            wake_target: None,
611        }
612    }
613
614    pub(crate) fn session_start_draft(id: impl Into<ProcessId>, input: ProcessInput) -> Self {
615        Self::new(id, input, ProcessProvenance::host())
616    }
617
618    pub fn with_process_provenance(mut self, provenance: ProcessProvenance) -> Self {
619        self.provenance = provenance;
620        self
621    }
622
623    pub fn with_execution_env_ref(mut self, env_ref: Option<ProcessExecutionEnvRef>) -> Self {
624        self.env_ref = env_ref;
625        self
626    }
627
628    pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
629        self.wake_target = wake_target;
630        self
631    }
632
633    pub fn with_identity(mut self, identity: ProcessIdentity) -> Self {
634        self.identity = identity;
635        self
636    }
637
638    pub fn with_event_types(
639        mut self,
640        event_types: impl IntoIterator<Item = ProcessEventType>,
641    ) -> Self {
642        self.event_types = event_types.into_iter().collect();
643        self
644    }
645
646    pub fn with_extra_event_types(
647        mut self,
648        event_types: impl IntoIterator<Item = ProcessEventType>,
649    ) -> Self {
650        self.event_types.extend(event_types);
651        self
652    }
653}
654
655#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
656#[serde(tag = "state", rename_all = "snake_case")]
657pub enum ProcessStatus {
658    #[default]
659    Running,
660    Completed {
661        await_output: ProcessAwaitOutput,
662    },
663    Failed {
664        await_output: ProcessAwaitOutput,
665    },
666    Cancelled {
667        await_output: ProcessAwaitOutput,
668    },
669}
670
671impl ProcessStatus {
672    pub fn from_terminal(terminal: ProcessTerminalSemantics) -> Self {
673        match terminal.state {
674            ProcessTerminalState::Completed => Self::Completed {
675                await_output: terminal.await_output,
676            },
677            ProcessTerminalState::Failed => Self::Failed {
678                await_output: terminal.await_output,
679            },
680            ProcessTerminalState::Cancelled => Self::Cancelled {
681                await_output: terminal.await_output,
682            },
683        }
684    }
685
686    pub fn is_terminal(&self) -> bool {
687        !matches!(self, Self::Running)
688    }
689
690    pub fn label(&self) -> &'static str {
691        match self {
692            Self::Running => "running",
693            Self::Completed { .. } => "completed",
694            Self::Failed { .. } => "failed",
695            Self::Cancelled { .. } => "cancelled",
696        }
697    }
698
699    pub fn terminal_state(&self) -> Option<ProcessTerminalState> {
700        match self {
701            Self::Running => None,
702            Self::Completed { .. } => Some(ProcessTerminalState::Completed),
703            Self::Failed { .. } => Some(ProcessTerminalState::Failed),
704            Self::Cancelled { .. } => Some(ProcessTerminalState::Cancelled),
705        }
706    }
707
708    pub fn await_output(&self) -> Option<&ProcessAwaitOutput> {
709        match self {
710            Self::Running => None,
711            Self::Completed { await_output }
712            | Self::Failed { await_output }
713            | Self::Cancelled { await_output } => Some(await_output),
714        }
715    }
716
717    pub fn terminal_semantics(&self) -> Option<ProcessTerminalSemantics> {
718        Some(ProcessTerminalSemantics {
719            state: self.terminal_state()?,
720            await_output: self.await_output()?.clone(),
721        })
722    }
723}
724
725/// Durable process row. Session-visible addressability lives in
726/// [`ProcessHandleGrant`], not in the process record.
727#[derive(Clone, Debug, Serialize, Deserialize)]
728pub struct ProcessRecord {
729    pub id: ProcessId,
730    pub registration_hash: String,
731    pub input: Arc<ProcessInput>,
732    pub identity: ProcessIdentity,
733    #[serde(default)]
734    pub event_types: Vec<ProcessEventType>,
735    pub provenance: ProcessProvenance,
736    #[serde(default, skip_serializing_if = "Option::is_none")]
737    pub env_ref: Option<ProcessExecutionEnvRef>,
738    #[serde(default, skip_serializing_if = "Option::is_none")]
739    pub wake_target: Option<SessionScope>,
740    #[serde(default)]
741    pub created_at_ms: u64,
742    #[serde(default)]
743    pub updated_at_ms: u64,
744    #[serde(default, skip_serializing_if = "Option::is_none")]
745    pub external_ref: Option<ProcessExternalRef>,
746    #[serde(default, skip_serializing_if = "Option::is_none")]
747    pub wait: Option<WaitState>,
748    #[serde(default)]
749    pub status: ProcessStatus,
750}
751
752#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
753pub struct WaitState {
754    pub kind: WaitKind,
755    pub since_ms: u64,
756}
757
758#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
759#[serde(tag = "kind", rename_all = "snake_case")]
760pub enum WaitKind {
761    Signal {
762        name: String,
763        event_type: String,
764        key: String,
765        ordinal: u64,
766    },
767}
768
769impl ProcessRecord {
770    pub fn from_registration(registration: ProcessRegistration) -> Self {
771        Self::from_registration_with_clock(registration, &crate::SystemClock)
772    }
773
774    pub fn from_registration_with_clock(
775        mut registration: ProcessRegistration,
776        clock: &dyn crate::Clock,
777    ) -> Self {
778        ensure_core_event_types(&mut registration);
779        validate_process_registration(&registration)
780            .expect("process registration should be valid before record construction");
781        let registration_hash = process_registration_hash(&registration)
782            .expect("process registration should hash before record construction");
783        Self::from_prepared_registration(registration, registration_hash, clock.timestamp_ms())
784    }
785
786    pub fn from_prepared_registration(
787        registration: ProcessRegistration,
788        registration_hash: String,
789        now_ms: u64,
790    ) -> Self {
791        Self {
792            id: registration.id,
793            registration_hash,
794            input: registration.input,
795            identity: registration.identity,
796            event_types: registration.event_types,
797            provenance: registration.provenance,
798            env_ref: registration.env_ref,
799            wake_target: registration.wake_target,
800            created_at_ms: now_ms,
801            updated_at_ms: now_ms,
802            external_ref: None,
803            wait: None,
804            status: ProcessStatus::Running,
805        }
806    }
807
808    pub fn is_terminal(&self) -> bool {
809        self.status.is_terminal()
810    }
811
812    pub fn clear_wake_target_for_session(&mut self, session_id: &str) -> bool {
813        self.clear_wake_target_for_session_with_clock(session_id, &crate::SystemClock)
814    }
815
816    pub fn clear_wake_target_for_session_with_clock(
817        &mut self,
818        session_id: &str,
819        clock: &dyn crate::Clock,
820    ) -> bool {
821        let should_clear = self
822            .wake_target
823            .as_ref()
824            .is_some_and(|scope| scope.session_id == session_id);
825        if should_clear {
826            self.wake_target = None;
827            self.updated_at_ms = clock.timestamp_ms();
828        }
829        should_clear
830    }
831
832    pub fn originator_scope_id(&self) -> String {
833        self.provenance.originator.scope_id()
834    }
835}
836
837/// Canonical process identity stored alongside every durable process row.
838///
839/// `ProcessInput::Engine` keeps its payload opaque to core. Engines therefore
840/// publish their visible kind, display label, and definition identity at the
841/// registration boundary; list, summary, trigger, and observation paths read
842/// this durable field instead of decoding engine payload conventions.
843#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
844pub struct ProcessIdentity {
845    pub kind: String,
846    #[serde(default, skip_serializing_if = "Option::is_none")]
847    pub label: Option<String>,
848    #[serde(default, skip_serializing_if = "Option::is_none")]
849    pub definition: Option<serde_json::Value>,
850}
851
852impl ProcessIdentity {
853    pub fn new(kind: impl Into<String>) -> Self {
854        Self {
855            kind: kind.into(),
856            label: None,
857            definition: None,
858        }
859    }
860
861    pub fn with_label(mut self, label: Option<impl Into<String>>) -> Self {
862        self.label = label.map(Into::into);
863        self
864    }
865
866    pub fn with_definition(mut self, definition: Option<serde_json::Value>) -> Self {
867        self.definition = definition;
868        self
869    }
870
871    pub fn from_process_input(input: &ProcessInput) -> Self {
872        match input {
873            ProcessInput::ToolCall { call } => {
874                Self::new("tool").with_label(Some(call.tool_name.clone()))
875            }
876            ProcessInput::Engine { kind, .. } => Self::new(kind.clone()),
877            ProcessInput::SessionTurn { create_request, .. } => {
878                let label = create_request
879                    .subagent
880                    .as_ref()
881                    .map(|subagent| subagent.capability.clone())
882                    .or_else(|| create_request.usage_source.clone())
883                    .or_else(|| create_request.session_id.clone());
884                Self::new("session_turn").with_label(label)
885            }
886            ProcessInput::External { metadata } => {
887                let label = metadata
888                    .get("label")
889                    .or_else(|| metadata.get("name"))
890                    .or_else(|| metadata.get("title"))
891                    .and_then(serde_json::Value::as_str)
892                    .map(str::to_string);
893                Self::new("external").with_label(label)
894            }
895        }
896    }
897}
898
899/// Wire-format version stamped on every persisted [`ProcessLease`].
900///
901/// Bump when the on-wire shape of `ProcessLease` changes in a way that older
902/// code cannot safely deserialize.
903pub const PROCESS_LEASE_SCHEMA_VERSION: u32 = 1;
904
905/// Durable lease over a non-terminal background process.
906///
907/// The lease pair `(owner_id, lease_token)` plus `fencing_token` are how lash guarantees that
908/// one non-terminal process is re-executed by exactly one worker at a time —
909/// even after a crash, even across two workers that both sweep the same
910/// registry for recoverable work. The durable backend
911/// (`lash-sqlite-store`) uses these to serialize concurrent claims on the same
912/// `process_id`; future distributed durable backends use the *same* fields to
913/// coordinate workers that don't share a file system.
914///
915/// **This is not single-process theatre.** The owner / fencing-token /
916/// lease-token triple is the public contract that lets any backend detect and
917/// reject stale writers. Treat it as load-bearing, not defensive.
918#[derive(Clone, Debug, Serialize, Deserialize)]
919pub struct ProcessLease {
920    pub schema_version: u32,
921    pub process_id: ProcessId,
922    pub owner_id: String,
923    pub lease_token: String,
924    pub fencing_token: u64,
925    pub claimed_at_epoch_ms: u64,
926    pub expires_at_epoch_ms: u64,
927}
928
929#[derive(Clone, Debug, Serialize, Deserialize)]
930pub struct ProcessLeaseCompletion {
931    pub process_id: ProcessId,
932    pub lease_token: String,
933}
934
935impl ProcessLeaseCompletion {
936    pub fn from_lease(lease: &ProcessLease) -> Self {
937        Self {
938            process_id: lease.process_id.clone(),
939            lease_token: lease.lease_token.clone(),
940        }
941    }
942}
943
944/// Durable backend reference for background work accepted outside the local process.
945#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
946pub struct ProcessExternalRef {
947    pub backend: String,
948    pub id: String,
949    #[serde(default, skip_serializing_if = "Option::is_none")]
950    pub metadata: Option<serde_json::Value>,
951}
952
953#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
954pub struct ProcessHandleDescriptor {
955    #[serde(default, skip_serializing_if = "Option::is_none")]
956    pub kind: Option<String>,
957    #[serde(default, skip_serializing_if = "Option::is_none")]
958    pub label: Option<String>,
959}
960
961impl ProcessHandleDescriptor {
962    pub fn new(kind: Option<impl Into<String>>, label: Option<impl Into<String>>) -> Self {
963        Self {
964            kind: kind.map(Into::into),
965            label: label.map(Into::into),
966        }
967    }
968}
969
970#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
971pub struct ProcessHandleGrant {
972    pub session_id: String,
973    pub process_id: ProcessId,
974    pub descriptor: ProcessHandleDescriptor,
975}
976
977pub type ProcessHandleGrantEntry = (ProcessHandleGrant, ProcessRecord);
978
979#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
980#[serde(rename_all = "snake_case")]
981pub enum ProcessLifecycleStatus {
982    #[default]
983    Running,
984    Completed,
985    Failed,
986    Cancelled,
987}
988
989impl ProcessLifecycleStatus {
990    pub fn label(self) -> &'static str {
991        match self {
992            Self::Running => "running",
993            Self::Completed => "completed",
994            Self::Failed => "failed",
995            Self::Cancelled => "cancelled",
996        }
997    }
998
999    pub fn is_terminal(self) -> bool {
1000        !matches!(self, Self::Running)
1001    }
1002
1003    pub fn terminal_state(self) -> Option<ProcessTerminalState> {
1004        match self {
1005            Self::Running => None,
1006            Self::Completed => Some(ProcessTerminalState::Completed),
1007            Self::Failed => Some(ProcessTerminalState::Failed),
1008            Self::Cancelled => Some(ProcessTerminalState::Cancelled),
1009        }
1010    }
1011}
1012
1013impl From<&ProcessStatus> for ProcessLifecycleStatus {
1014    fn from(status: &ProcessStatus) -> Self {
1015        match status {
1016            ProcessStatus::Running => Self::Running,
1017            ProcessStatus::Completed { .. } => Self::Completed,
1018            ProcessStatus::Failed { .. } => Self::Failed,
1019            ProcessStatus::Cancelled { .. } => Self::Cancelled,
1020        }
1021    }
1022}
1023
1024impl From<ProcessStatus> for ProcessLifecycleStatus {
1025    fn from(status: ProcessStatus) -> Self {
1026        Self::from(&status)
1027    }
1028}
1029
1030#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1031pub struct ProcessHandleSummary {
1032    #[serde(rename = "__handle__")]
1033    pub handle_type: String,
1034    pub id: ProcessId,
1035    pub process_id: ProcessId,
1036    pub descriptor: ProcessHandleDescriptor,
1037    #[serde(default, skip_serializing_if = "Option::is_none")]
1038    pub definition: Option<serde_json::Value>,
1039    pub status: ProcessLifecycleStatus,
1040}
1041
1042impl ProcessHandleSummary {
1043    pub fn new(
1044        process_id: impl Into<ProcessId>,
1045        descriptor: ProcessHandleDescriptor,
1046        status: ProcessLifecycleStatus,
1047    ) -> Self {
1048        let process_id = process_id.into();
1049        Self {
1050            handle_type: "process".to_string(),
1051            id: process_id.clone(),
1052            process_id,
1053            descriptor,
1054            definition: None,
1055            status,
1056        }
1057    }
1058
1059    pub fn with_definition(mut self, definition: Option<serde_json::Value>) -> Self {
1060        self.definition = definition;
1061        self
1062    }
1063
1064    pub fn from_grant_record(grant: ProcessHandleGrant, record: ProcessRecord) -> Self {
1065        let definition = record.identity.definition.clone();
1066        Self::new(
1067            record.id,
1068            grant.descriptor,
1069            ProcessLifecycleStatus::from(record.status),
1070        )
1071        .with_definition(definition)
1072    }
1073}
1074
1075impl From<ProcessHandleGrantEntry> for ProcessHandleSummary {
1076    fn from((grant, record): ProcessHandleGrantEntry) -> Self {
1077        Self::from_grant_record(grant, record)
1078    }
1079}
1080
1081#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1082pub struct ProcessCancelSummary {
1083    pub process_id: ProcessId,
1084    pub status: ProcessLifecycleStatus,
1085}
1086
1087impl ProcessCancelSummary {
1088    pub fn from_record(record: ProcessRecord) -> Self {
1089        Self {
1090            process_id: record.id,
1091            status: ProcessLifecycleStatus::from(record.status),
1092        }
1093    }
1094}
1095
1096#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
1097pub enum ProcessStatusFilter {
1098    #[default]
1099    Running,
1100    Completed,
1101    Failed,
1102    Cancelled,
1103    Any,
1104}
1105
1106impl ProcessStatusFilter {
1107    pub fn decode(value: Option<&str>) -> Result<Self, String> {
1108        match value.unwrap_or("running") {
1109            "running" => Ok(Self::Running),
1110            "completed" => Ok(Self::Completed),
1111            "failed" => Ok(Self::Failed),
1112            "cancelled" => Ok(Self::Cancelled),
1113            "any" => Ok(Self::Any),
1114            other => Err(format!(
1115                "processes.list status must be `running`, `completed`, `failed`, `cancelled`, or `any`, got `{other}`"
1116            )),
1117        }
1118    }
1119
1120    pub fn list_mode(self) -> ProcessListMode {
1121        match self {
1122            Self::Running => ProcessListMode::Live,
1123            Self::Completed | Self::Failed | Self::Cancelled | Self::Any => ProcessListMode::All,
1124        }
1125    }
1126
1127    pub fn matches(self, status: ProcessLifecycleStatus) -> bool {
1128        match self {
1129            Self::Running => status == ProcessLifecycleStatus::Running,
1130            Self::Completed => status == ProcessLifecycleStatus::Completed,
1131            Self::Failed => status == ProcessLifecycleStatus::Failed,
1132            Self::Cancelled => status == ProcessLifecycleStatus::Cancelled,
1133            Self::Any => true,
1134        }
1135    }
1136}
1137
1138#[derive(Clone, Debug, Default, PartialEq)]
1139pub struct ProcessListFilter {
1140    pub definition: Option<serde_json::Value>,
1141    pub status: ProcessStatusFilter,
1142    pub waiting: Option<bool>,
1143}
1144
1145impl ProcessListFilter {
1146    pub fn decode(args: &serde_json::Value) -> Result<Self, String> {
1147        let map = args
1148            .as_object()
1149            .ok_or_else(|| "processes.list expects a record of process filters".to_string())?;
1150        for key in map.keys() {
1151            match key.as_str() {
1152                "definition" | "status" | "waiting" => {}
1153                _ => return Err(format!("processes.list unknown filter `{key}`")),
1154            }
1155        }
1156        let definition = args.get("definition").cloned();
1157        let status =
1158            ProcessStatusFilter::decode(args.get("status").and_then(serde_json::Value::as_str))?;
1159        let waiting = args
1160            .get("waiting")
1161            .map(|value| {
1162                value
1163                    .as_bool()
1164                    .ok_or_else(|| "processes.list `waiting` filter must be a boolean".to_string())
1165            })
1166            .transpose()?;
1167        Ok(Self {
1168            definition,
1169            status,
1170            waiting,
1171        })
1172    }
1173
1174    pub fn list_mode(&self) -> ProcessListMode {
1175        self.status.list_mode()
1176    }
1177
1178    pub fn matches_entry(&self, entry: &ProcessHandleGrantEntry) -> bool {
1179        let (_grant, record) = entry;
1180        self.matches_record(record)
1181    }
1182
1183    pub fn matches_record(&self, record: &ProcessRecord) -> bool {
1184        let status = ProcessLifecycleStatus::from(&record.status);
1185        self.status.matches(status)
1186            && self
1187                .definition
1188                .as_ref()
1189                .is_none_or(|definition| record.identity.definition.as_ref() == Some(definition))
1190            && self
1191                .waiting
1192                .is_none_or(|waiting| record.wait.is_some() == waiting)
1193    }
1194}
1195
1196#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1197#[serde(rename_all = "snake_case")]
1198pub enum ProcessListMode {
1199    #[default]
1200    Live,
1201    All,
1202}
1203
1204impl ProcessListMode {
1205    pub fn as_str(self) -> &'static str {
1206        match self {
1207            Self::Live => "live",
1208            Self::All => "all",
1209        }
1210    }
1211}
1212
1213#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1214pub struct ProcessStartGrant {
1215    pub session_scope: SessionScope,
1216    pub descriptor: ProcessHandleDescriptor,
1217}
1218
1219#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1220pub struct ProcessSessionDeleteReport {
1221    pub session_id: String,
1222    pub revoked_handle_count: usize,
1223    pub deleted_wake_count: usize,
1224    pub orphaned_process_ids: Vec<String>,
1225    pub preserved_process_ids: Vec<String>,
1226}
1227
1228#[cfg(test)]
1229mod tests {
1230    use serde_json::json;
1231
1232    use super::*;
1233
1234    fn process_value(component: &str, pos: usize, name: &str) -> serde_json::Value {
1235        json!({
1236            "component": component,
1237            "pos": pos,
1238            "name": name,
1239        })
1240    }
1241
1242    fn engine_entry(
1243        process_id: &str,
1244        definition: serde_json::Value,
1245        process_name: &str,
1246        status: ProcessStatus,
1247    ) -> ProcessHandleGrantEntry {
1248        let mut record = ProcessRecord::from_registration(
1249            ProcessRegistration::new(
1250                process_id,
1251                ProcessInput::Engine {
1252                    kind: "test-engine".to_string(),
1253                    payload: json!({
1254                        "definition": definition.clone(),
1255                        "label": process_name,
1256                    }),
1257                },
1258                ProcessProvenance::host(),
1259            )
1260            .with_identity(
1261                ProcessIdentity::new("test-engine")
1262                    .with_label(Some(process_name))
1263                    .with_definition(Some(definition)),
1264            )
1265            .with_execution_env_ref(Some(ProcessExecutionEnvRef::new(format!(
1266                "process-env:test:{process_id}"
1267            )))),
1268        );
1269        record.status = status;
1270        (
1271            ProcessHandleGrant {
1272                session_id: "session".to_string(),
1273                process_id: process_id.to_string(),
1274                descriptor: ProcessHandleDescriptor::new(Some("test-engine"), Some(process_name)),
1275            },
1276            record,
1277        )
1278    }
1279
1280    #[test]
1281    fn process_list_filter_matches_definition_and_status() {
1282        let target_ref = process_value("target", 0, "target");
1283        let other_ref = process_value("other", 1, "other");
1284        let filter = ProcessListFilter::decode(&json!({
1285            "definition": target_ref,
1286            "status": "completed"
1287        }))
1288        .expect("decode filter");
1289
1290        let matching = engine_entry(
1291            "matching",
1292            target_ref,
1293            "target",
1294            ProcessStatus::Completed {
1295                await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1296                    json!(true),
1297                )),
1298            },
1299        );
1300        let wrong_definition = engine_entry(
1301            "wrong-definition",
1302            other_ref,
1303            "other",
1304            ProcessStatus::Completed {
1305                await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1306                    json!(true),
1307                )),
1308            },
1309        );
1310
1311        assert_eq!(filter.list_mode(), ProcessListMode::All);
1312        assert!(filter.matches_entry(&matching));
1313        assert!(!filter.matches_entry(&wrong_definition));
1314    }
1315
1316    #[test]
1317    fn process_list_filter_matches_waiting_facet() {
1318        let process_ref = process_value("target", 0, "target");
1319        let mut waiting_entry = engine_entry(
1320            "waiting",
1321            process_ref.clone(),
1322            "target",
1323            ProcessStatus::Running,
1324        );
1325        waiting_entry.1.wait = Some(WaitState {
1326            since_ms: 42,
1327            kind: WaitKind::Signal {
1328                name: "ready".to_string(),
1329                event_type: "signal.ready".to_string(),
1330                key: "process:waiting:signal.ready:1".to_string(),
1331                ordinal: 1,
1332            },
1333        });
1334        let idle_entry = engine_entry("idle", process_ref, "target", ProcessStatus::Running);
1335        let waiting_filter =
1336            ProcessListFilter::decode(&json!({ "waiting": true })).expect("decode waiting filter");
1337        let idle_filter =
1338            ProcessListFilter::decode(&json!({ "waiting": false })).expect("decode idle filter");
1339
1340        assert_eq!(waiting_filter.list_mode(), ProcessListMode::Live);
1341        assert!(waiting_filter.matches_entry(&waiting_entry));
1342        assert!(!waiting_filter.matches_entry(&idle_entry));
1343        assert!(!idle_filter.matches_entry(&waiting_entry));
1344        assert!(idle_filter.matches_entry(&idle_entry));
1345        assert!(
1346            ProcessListFilter::decode(&json!({ "waiting": "yes" }))
1347                .expect_err("invalid waiting filter")
1348                .contains("must be a boolean")
1349        );
1350    }
1351}