Skip to main content

lash_core/runtime/process/
model.rs

1use std::collections::BTreeMap;
2use std::fmt;
3use std::sync::Arc;
4use std::sync::Mutex;
5
6use serde::{Deserialize, Serialize};
7
8use super::events::{
9    ProcessAwaitOutput, ProcessEventType, ProcessTerminalSemantics, ProcessTerminalState,
10    default_process_event_types,
11};
12use super::time::current_epoch_ms;
13use super::validation::{
14    ensure_core_event_types, process_registration_hash, validate_process_registration,
15};
16
17pub type ProcessId = String;
18
19#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
20#[serde(transparent)]
21pub struct SessionScopeId(String);
22
23impl SessionScopeId {
24    pub fn new(value: impl Into<String>) -> Self {
25        Self(value.into())
26    }
27
28    pub fn as_str(&self) -> &str {
29        &self.0
30    }
31}
32
33impl fmt::Display for SessionScopeId {
34    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
35        formatter.write_str(&self.0)
36    }
37}
38
39impl From<String> for SessionScopeId {
40    fn from(value: String) -> Self {
41        Self::new(value)
42    }
43}
44
45impl From<&str> for SessionScopeId {
46    fn from(value: &str) -> Self {
47        Self::new(value)
48    }
49}
50
51/// Durable executable input for a process.
52///
53/// `ToolCall`, `SessionTurn`, and `External` are kernel process primitives:
54/// core owns their durable representation and execution semantics because they
55/// are how the runtime coordinates tools, child sessions, and externally
56/// completed work. `Engine` is the extension point for deployment-specific
57/// process runtimes; those rows require a matching [`crate::ProcessEngine`] in
58/// the host's process engine registry.
59#[derive(Debug, Serialize, Deserialize)]
60#[serde(tag = "type", rename_all = "snake_case")]
61pub enum ProcessInput {
62    ToolCall {
63        call: crate::PreparedToolCall,
64    },
65    Engine {
66        kind: String,
67        #[serde(default)]
68        payload: serde_json::Value,
69    },
70    SessionTurn {
71        create_request: Box<crate::SessionCreateRequest>,
72        turn_input: Box<crate::TurnInput>,
73        output_contract: crate::ToolOutputContract,
74    },
75    External {
76        #[serde(default)]
77        metadata: serde_json::Value,
78    },
79}
80
81impl Clone for ProcessInput {
82    fn clone(&self) -> Self {
83        match self {
84            Self::ToolCall { call } => Self::ToolCall { call: call.clone() },
85            Self::Engine { kind, payload } => Self::Engine {
86                kind: kind.clone(),
87                payload: payload.clone(),
88            },
89            Self::SessionTurn {
90                create_request,
91                turn_input,
92                output_contract,
93            } => Self::SessionTurn {
94                create_request: create_request.clone(),
95                turn_input: turn_input.clone(),
96                output_contract: output_contract.clone(),
97            },
98            Self::External { metadata } => Self::External {
99                metadata: metadata.clone(),
100            },
101        }
102    }
103}
104
105impl PartialEq for ProcessInput {
106    fn eq(&self, other: &Self) -> bool {
107        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
108    }
109}
110
111impl ProcessInput {
112    pub fn engine_kind(&self) -> &'static str {
113        match self {
114            Self::ToolCall { .. } => "tool",
115            Self::Engine { .. } => "engine",
116            Self::SessionTurn { .. } => "session_turn",
117            Self::External { .. } => "external",
118        }
119    }
120
121    pub fn engine_specific_kind(&self) -> Option<&str> {
122        match self {
123            Self::Engine { kind, .. } => Some(kind.as_str()),
124            _ => None,
125        }
126    }
127}
128
129#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
130#[serde(transparent)]
131pub struct ProcessExecutionEnvRef(String);
132
133impl ProcessExecutionEnvRef {
134    pub fn new(value: impl Into<String>) -> Self {
135        Self(value.into())
136    }
137
138    pub fn as_str(&self) -> &str {
139        &self.0
140    }
141}
142
143impl fmt::Display for ProcessExecutionEnvRef {
144    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
145        formatter.write_str(&self.0)
146    }
147}
148
149#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
150#[serde(deny_unknown_fields)]
151pub struct ProcessExecutionEnvSpec {
152    #[serde(default)]
153    pub plugin_options: crate::PluginOptions,
154    #[serde(default)]
155    pub policy: crate::SessionPolicy,
156}
157
158impl ProcessExecutionEnvSpec {
159    pub fn new(plugin_options: crate::PluginOptions, policy: crate::SessionPolicy) -> Self {
160        Self {
161            plugin_options,
162            policy,
163        }
164    }
165
166    pub fn stable_ref(&self) -> Result<ProcessExecutionEnvRef, serde_json::Error> {
167        crate::stable_hash::stable_json_sha256_hex(self)
168            .map(|hash| ProcessExecutionEnvRef::new(format!("process-env:sha256:{hash}")))
169    }
170
171    pub fn to_store_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
172        serde_json::to_vec(self)
173    }
174
175    pub fn from_store_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
176        serde_json::from_slice(bytes)
177    }
178}
179
180#[async_trait::async_trait]
181pub trait ProcessExecutionEnvStore: Send + Sync {
182    fn durability_tier(&self) -> crate::DurabilityTier {
183        crate::DurabilityTier::Inline
184    }
185
186    async fn put_process_execution_env(
187        &self,
188        env_ref: &ProcessExecutionEnvRef,
189        bytes: &[u8],
190    ) -> Result<(), crate::PluginError>;
191
192    async fn get_process_execution_env(
193        &self,
194        env_ref: &ProcessExecutionEnvRef,
195    ) -> Result<Option<Vec<u8>>, crate::PluginError>;
196}
197
198#[derive(Default)]
199pub struct InMemoryProcessExecutionEnvStore {
200    envs: Mutex<BTreeMap<String, Vec<u8>>>,
201}
202
203impl InMemoryProcessExecutionEnvStore {
204    pub fn new() -> Self {
205        Self::default()
206    }
207}
208
209#[async_trait::async_trait]
210impl ProcessExecutionEnvStore for InMemoryProcessExecutionEnvStore {
211    async fn put_process_execution_env(
212        &self,
213        env_ref: &ProcessExecutionEnvRef,
214        bytes: &[u8],
215    ) -> Result<(), crate::PluginError> {
216        self.envs
217            .lock()
218            .map_err(|_| {
219                crate::PluginError::Session("process execution env store lock poisoned".to_string())
220            })?
221            .insert(env_ref.as_str().to_string(), bytes.to_vec());
222        Ok(())
223    }
224
225    async fn get_process_execution_env(
226        &self,
227        env_ref: &ProcessExecutionEnvRef,
228    ) -> Result<Option<Vec<u8>>, crate::PluginError> {
229        Ok(self
230            .envs
231            .lock()
232            .map_err(|_| {
233                crate::PluginError::Session("process execution env store lock poisoned".to_string())
234            })?
235            .get(env_ref.as_str())
236            .cloned())
237    }
238}
239
240pub async fn persist_process_execution_env(
241    env_store: &dyn ProcessExecutionEnvStore,
242    spec: &ProcessExecutionEnvSpec,
243) -> Result<ProcessExecutionEnvRef, crate::PluginError> {
244    let env_ref = spec.stable_ref().map_err(|err| {
245        crate::PluginError::Session(format!("failed to hash process execution env: {err}"))
246    })?;
247    let bytes = spec.to_store_bytes().map_err(|err| {
248        crate::PluginError::Session(format!("failed to encode process execution env: {err}"))
249    })?;
250    env_store
251        .put_process_execution_env(&env_ref, &bytes)
252        .await?;
253    Ok(env_ref)
254}
255
256pub async fn load_process_execution_env(
257    env_store: &dyn ProcessExecutionEnvStore,
258    env_ref: &ProcessExecutionEnvRef,
259) -> Result<ProcessExecutionEnvSpec, crate::PluginError> {
260    let bytes = env_store
261        .get_process_execution_env(env_ref)
262        .await?
263        .ok_or_else(|| {
264            crate::PluginError::Session(format!("missing process execution env `{env_ref}`"))
265        })?;
266    ProcessExecutionEnvSpec::from_store_bytes(&bytes).map_err(|err| {
267        crate::PluginError::Session(format!(
268            "failed to decode process execution env `{env_ref}`: {err}"
269        ))
270    })
271}
272
273/// Execution-local context for process runners. Durable edges live on the
274/// process record.
275#[derive(Clone, Debug, Default, Serialize, Deserialize)]
276pub struct ProcessExecutionContext {
277    #[serde(default, skip_serializing_if = "Option::is_none")]
278    pub causal_invocation: Option<crate::RuntimeInvocation>,
279}
280
281impl ProcessExecutionContext {
282    pub fn with_causal_invocation(mut self, invocation: Option<crate::RuntimeInvocation>) -> Self {
283        self.causal_invocation = invocation;
284        self
285    }
286
287    pub fn is_empty(&self) -> bool {
288        self.causal_invocation.is_none()
289    }
290}
291
292#[derive(Clone)]
293pub struct ProcessOpScope<'scope> {
294    pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
295    pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'scope>,
296    pub(crate) agent_frame_id: Option<crate::AgentFrameId>,
297    pub(crate) target_agent_frame_id: Option<crate::AgentFrameId>,
298}
299
300impl<'scope> ProcessOpScope<'scope> {
301    pub fn new(scoped_effect_controller: crate::ScopedEffectController<'scope>) -> Self {
302        Self {
303            parent_invocation: None,
304            effect_controller: crate::runtime::RuntimeEffectControllerHandle::borrowed(
305                scoped_effect_controller,
306            ),
307            agent_frame_id: None,
308            target_agent_frame_id: None,
309        }
310    }
311
312    pub fn with_parent_invocation(
313        mut self,
314        parent_invocation: Option<crate::RuntimeInvocation>,
315    ) -> Self {
316        self.parent_invocation = parent_invocation;
317        self
318    }
319
320    pub fn with_agent_frame_id(mut self, agent_frame_id: Option<crate::AgentFrameId>) -> Self {
321        self.agent_frame_id = agent_frame_id;
322        self
323    }
324
325    pub fn with_target_agent_frame_id(
326        mut self,
327        agent_frame_id: Option<crate::AgentFrameId>,
328    ) -> Self {
329        self.target_agent_frame_id = agent_frame_id;
330        self
331    }
332
333    pub fn agent_frame_id(&self) -> Option<&str> {
334        self.agent_frame_id.as_deref()
335    }
336
337    pub fn target_agent_frame_id(&self) -> Option<&str> {
338        self.target_agent_frame_id.as_deref()
339    }
340
341    pub(crate) fn controller(&self) -> &dyn crate::RuntimeEffectController {
342        self.effect_controller.controller()
343    }
344}
345
346#[derive(Clone, Debug, Default)]
347pub struct ProcessStartOptions {
348    pub descriptor: Option<ProcessHandleDescriptor>,
349    /// Runtime-internal spawn provenance override. Set by process execution
350    /// contexts so children started *by a process* inherit the parent's
351    /// originator and wake target instead of being stamped with the ephemeral
352    /// execution scope. `None` means the session start path stamps the
353    /// creating session (the in-session meaning of "start"). This rides
354    /// options — not the request — so in-session callers cannot forge
355    /// provenance through the session surface.
356    pub spawn_provenance: Option<ProcessSpawnProvenance>,
357}
358
359/// Provenance a process-run context hands to its children: the chain's
360/// originator and the chain's wake target. Mirrors the trigger fire path,
361/// where the spawned process inherits the registrant and the grant is derived
362/// from the wake target.
363#[derive(Clone, Debug, PartialEq, Eq)]
364pub struct ProcessSpawnProvenance {
365    pub originator: ProcessOriginator,
366    pub wake_target: Option<SessionScope>,
367}
368
369impl ProcessStartOptions {
370    pub fn new() -> Self {
371        Self::default()
372    }
373
374    pub fn with_descriptor(mut self, descriptor: ProcessHandleDescriptor) -> Self {
375        self.descriptor = Some(descriptor);
376        self
377    }
378
379    pub fn with_optional_descriptor(mut self, descriptor: Option<ProcessHandleDescriptor>) -> Self {
380        self.descriptor = descriptor;
381        self
382    }
383
384    pub fn with_spawn_provenance(mut self, spawn_provenance: ProcessSpawnProvenance) -> Self {
385        self.spawn_provenance = Some(spawn_provenance);
386        self
387    }
388
389    pub fn execution_context(&self, scope: &ProcessOpScope<'_>) -> ProcessExecutionContext {
390        ProcessExecutionContext {
391            causal_invocation: scope.parent_invocation.clone(),
392        }
393    }
394}
395
396/// Public host-facing request for starting a visible process handle.
397#[derive(Clone, Debug, Serialize, Deserialize)]
398pub struct ProcessStartRequest {
399    pub id: ProcessId,
400    pub input: ProcessInput,
401    #[serde(default, skip_serializing_if = "Option::is_none")]
402    pub env_spec: Option<ProcessExecutionEnvSpec>,
403    pub originator: ProcessOriginator,
404    #[serde(default, skip_serializing_if = "Option::is_none")]
405    pub wake_target: Option<SessionScope>,
406    #[serde(default, skip_serializing_if = "Option::is_none")]
407    pub grant: Option<ProcessStartGrant>,
408    #[serde(default)]
409    pub event_types: Vec<ProcessEventType>,
410}
411
412impl ProcessStartRequest {
413    pub fn new(
414        id: impl Into<ProcessId>,
415        input: ProcessInput,
416        originator: ProcessOriginator,
417    ) -> Self {
418        Self {
419            id: id.into(),
420            input,
421            env_spec: None,
422            originator,
423            wake_target: None,
424            grant: None,
425            event_types: default_process_event_types(),
426        }
427    }
428
429    pub fn external(
430        id: impl Into<ProcessId>,
431        originator: ProcessOriginator,
432        metadata: serde_json::Value,
433    ) -> Self {
434        Self::new(id, ProcessInput::External { metadata }, originator)
435    }
436
437    pub fn with_env_spec(mut self, env_spec: ProcessExecutionEnvSpec) -> Self {
438        self.env_spec = Some(env_spec);
439        self
440    }
441
442    pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
443        self.wake_target = wake_target;
444        self
445    }
446
447    pub fn with_grant(mut self, grant: Option<ProcessStartGrant>) -> Self {
448        self.grant = grant;
449        self
450    }
451
452    pub fn with_event_types(
453        mut self,
454        event_types: impl IntoIterator<Item = ProcessEventType>,
455    ) -> Self {
456        self.event_types = event_types.into_iter().collect();
457        self
458    }
459
460    pub fn with_extra_event_types(
461        mut self,
462        event_types: impl IntoIterator<Item = ProcessEventType>,
463    ) -> Self {
464        self.event_types.extend(event_types);
465        self
466    }
467
468    pub fn into_registration(self, env_ref: Option<ProcessExecutionEnvRef>) -> ProcessRegistration {
469        ProcessRegistration::new(self.id, self.input, ProcessProvenance::new(self.originator))
470            .with_event_types(self.event_types)
471            .with_execution_env_ref(env_ref)
472            .with_wake_target(self.wake_target)
473    }
474}
475
476#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
477pub struct SessionScope {
478    pub session_id: String,
479    #[serde(default, skip_serializing_if = "Option::is_none")]
480    pub agent_frame_id: Option<crate::AgentFrameId>,
481}
482
483#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
484pub struct ProcessProvenance {
485    pub originator: ProcessOriginator,
486    #[serde(default, skip_serializing_if = "Option::is_none")]
487    pub caused_by: Option<crate::CausalRef>,
488}
489
490impl ProcessProvenance {
491    pub fn new(originator: ProcessOriginator) -> Self {
492        Self {
493            originator,
494            caused_by: None,
495        }
496    }
497
498    pub fn host() -> Self {
499        Self::new(ProcessOriginator::host())
500    }
501
502    pub fn session(scope: SessionScope) -> Self {
503        Self::new(ProcessOriginator::session(scope))
504    }
505
506    pub fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
507        self.caused_by = caused_by;
508        self
509    }
510}
511
512#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
513#[serde(tag = "type", rename_all = "snake_case")]
514pub enum ProcessOriginator {
515    Host,
516    Session { scope: SessionScope },
517}
518
519impl ProcessOriginator {
520    pub fn host() -> Self {
521        Self::Host
522    }
523
524    pub fn session(scope: SessionScope) -> Self {
525        Self::Session { scope }
526    }
527
528    pub fn scope_id(&self) -> String {
529        match self {
530            Self::Host => "host".to_string(),
531            Self::Session { scope } => scope.id().to_string(),
532        }
533    }
534}
535
536impl SessionScope {
537    pub fn new(session_id: impl Into<String>) -> Self {
538        Self {
539            session_id: session_id.into(),
540            agent_frame_id: None,
541        }
542    }
543
544    pub fn for_agent_frame(
545        session_id: impl Into<String>,
546        agent_frame_id: impl Into<crate::AgentFrameId>,
547    ) -> Self {
548        Self {
549            session_id: session_id.into(),
550            agent_frame_id: Some(agent_frame_id.into()),
551        }
552    }
553
554    pub fn id(&self) -> SessionScopeId {
555        match self.agent_frame_id.as_deref() {
556            Some(frame_id) if !frame_id.is_empty() => {
557                SessionScopeId::new(format!("session:{}/frame:{frame_id}", self.session_id))
558            }
559            _ => SessionScopeId::new(format!("session:{}", self.session_id)),
560        }
561    }
562
563    pub fn is_empty(&self) -> bool {
564        self.session_id.is_empty()
565    }
566}
567
568/// Serializable process spec used to start or recover a runtime process.
569#[derive(Debug, Serialize, Deserialize)]
570pub struct ProcessRegistration {
571    pub id: ProcessId,
572    pub input: Arc<ProcessInput>,
573    pub identity: ProcessIdentity,
574    #[serde(default)]
575    pub event_types: Vec<ProcessEventType>,
576    pub provenance: ProcessProvenance,
577    #[serde(default, skip_serializing_if = "Option::is_none")]
578    pub env_ref: Option<ProcessExecutionEnvRef>,
579    #[serde(default, skip_serializing_if = "Option::is_none")]
580    pub wake_target: Option<SessionScope>,
581}
582
583impl Clone for ProcessRegistration {
584    fn clone(&self) -> Self {
585        Self {
586            id: self.id.clone(),
587            input: Arc::clone(&self.input),
588            identity: self.identity.clone(),
589            event_types: self.event_types.clone(),
590            provenance: self.provenance.clone(),
591            env_ref: self.env_ref.clone(),
592            wake_target: self.wake_target.clone(),
593        }
594    }
595}
596
597impl ProcessRegistration {
598    pub fn new(
599        id: impl Into<ProcessId>,
600        input: ProcessInput,
601        provenance: ProcessProvenance,
602    ) -> Self {
603        let identity = ProcessIdentity::from_process_input(&input);
604        Self {
605            id: id.into(),
606            input: Arc::new(input),
607            identity,
608            event_types: default_process_event_types(),
609            provenance,
610            env_ref: None,
611            wake_target: None,
612        }
613    }
614
615    pub(crate) fn session_start_draft(id: impl Into<ProcessId>, input: ProcessInput) -> Self {
616        Self::new(id, input, ProcessProvenance::host())
617    }
618
619    pub fn with_process_provenance(mut self, provenance: ProcessProvenance) -> Self {
620        self.provenance = provenance;
621        self
622    }
623
624    pub fn with_execution_env_ref(mut self, env_ref: Option<ProcessExecutionEnvRef>) -> Self {
625        self.env_ref = env_ref;
626        self
627    }
628
629    pub fn with_wake_target(mut self, wake_target: Option<SessionScope>) -> Self {
630        self.wake_target = wake_target;
631        self
632    }
633
634    pub fn with_identity(mut self, identity: ProcessIdentity) -> Self {
635        self.identity = identity;
636        self
637    }
638
639    pub fn with_event_types(
640        mut self,
641        event_types: impl IntoIterator<Item = ProcessEventType>,
642    ) -> Self {
643        self.event_types = event_types.into_iter().collect();
644        self
645    }
646
647    pub fn with_extra_event_types(
648        mut self,
649        event_types: impl IntoIterator<Item = ProcessEventType>,
650    ) -> Self {
651        self.event_types.extend(event_types);
652        self
653    }
654}
655
656#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
657#[serde(tag = "state", rename_all = "snake_case")]
658pub enum ProcessStatus {
659    #[default]
660    Running,
661    Completed {
662        await_output: ProcessAwaitOutput,
663    },
664    Failed {
665        await_output: ProcessAwaitOutput,
666    },
667    Cancelled {
668        await_output: ProcessAwaitOutput,
669    },
670}
671
672impl ProcessStatus {
673    pub fn from_terminal(terminal: ProcessTerminalSemantics) -> Self {
674        match terminal.state {
675            ProcessTerminalState::Completed => Self::Completed {
676                await_output: terminal.await_output,
677            },
678            ProcessTerminalState::Failed => Self::Failed {
679                await_output: terminal.await_output,
680            },
681            ProcessTerminalState::Cancelled => Self::Cancelled {
682                await_output: terminal.await_output,
683            },
684        }
685    }
686
687    pub fn is_terminal(&self) -> bool {
688        !matches!(self, Self::Running)
689    }
690
691    pub fn label(&self) -> &'static str {
692        match self {
693            Self::Running => "running",
694            Self::Completed { .. } => "completed",
695            Self::Failed { .. } => "failed",
696            Self::Cancelled { .. } => "cancelled",
697        }
698    }
699
700    pub fn terminal_state(&self) -> Option<ProcessTerminalState> {
701        match self {
702            Self::Running => None,
703            Self::Completed { .. } => Some(ProcessTerminalState::Completed),
704            Self::Failed { .. } => Some(ProcessTerminalState::Failed),
705            Self::Cancelled { .. } => Some(ProcessTerminalState::Cancelled),
706        }
707    }
708
709    pub fn await_output(&self) -> Option<&ProcessAwaitOutput> {
710        match self {
711            Self::Running => None,
712            Self::Completed { await_output }
713            | Self::Failed { await_output }
714            | Self::Cancelled { await_output } => Some(await_output),
715        }
716    }
717
718    pub fn terminal_semantics(&self) -> Option<ProcessTerminalSemantics> {
719        Some(ProcessTerminalSemantics {
720            state: self.terminal_state()?,
721            await_output: self.await_output()?.clone(),
722        })
723    }
724}
725
726/// Durable process row. Session-visible addressability lives in
727/// [`ProcessHandleGrant`], not in the process record.
728#[derive(Clone, Debug, Serialize, Deserialize)]
729pub struct ProcessRecord {
730    pub id: ProcessId,
731    pub registration_hash: String,
732    pub input: Arc<ProcessInput>,
733    pub identity: ProcessIdentity,
734    #[serde(default)]
735    pub event_types: Vec<ProcessEventType>,
736    pub provenance: ProcessProvenance,
737    #[serde(default, skip_serializing_if = "Option::is_none")]
738    pub env_ref: Option<ProcessExecutionEnvRef>,
739    #[serde(default, skip_serializing_if = "Option::is_none")]
740    pub wake_target: Option<SessionScope>,
741    #[serde(default)]
742    pub created_at_ms: u64,
743    #[serde(default)]
744    pub updated_at_ms: u64,
745    #[serde(default, skip_serializing_if = "Option::is_none")]
746    pub external_ref: Option<ProcessExternalRef>,
747    #[serde(default, skip_serializing_if = "Option::is_none")]
748    pub wait: Option<WaitState>,
749    #[serde(default)]
750    pub status: ProcessStatus,
751}
752
753#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
754pub struct WaitState {
755    pub kind: WaitKind,
756    pub since_ms: u64,
757}
758
759#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
760#[serde(tag = "kind", rename_all = "snake_case")]
761pub enum WaitKind {
762    Signal {
763        name: String,
764        event_type: String,
765        key: String,
766        ordinal: u64,
767    },
768}
769
770impl ProcessRecord {
771    pub fn from_registration(mut registration: ProcessRegistration) -> Self {
772        ensure_core_event_types(&mut registration);
773        validate_process_registration(&registration)
774            .expect("process registration should be valid before record construction");
775        let registration_hash = process_registration_hash(&registration)
776            .expect("process registration should hash before record construction");
777        Self::from_prepared_registration(registration, registration_hash, current_epoch_ms())
778    }
779
780    pub fn from_prepared_registration(
781        registration: ProcessRegistration,
782        registration_hash: String,
783        now_ms: u64,
784    ) -> Self {
785        Self {
786            id: registration.id,
787            registration_hash,
788            input: registration.input,
789            identity: registration.identity,
790            event_types: registration.event_types,
791            provenance: registration.provenance,
792            env_ref: registration.env_ref,
793            wake_target: registration.wake_target,
794            created_at_ms: now_ms,
795            updated_at_ms: now_ms,
796            external_ref: None,
797            wait: None,
798            status: ProcessStatus::Running,
799        }
800    }
801
802    pub fn is_terminal(&self) -> bool {
803        self.status.is_terminal()
804    }
805
806    pub fn clear_wake_target_for_session(&mut self, session_id: &str) -> bool {
807        let should_clear = self
808            .wake_target
809            .as_ref()
810            .is_some_and(|scope| scope.session_id == session_id);
811        if should_clear {
812            self.wake_target = None;
813            self.updated_at_ms = current_epoch_ms();
814        }
815        should_clear
816    }
817
818    pub fn originator_scope_id(&self) -> String {
819        self.provenance.originator.scope_id()
820    }
821}
822
823/// Canonical process identity stored alongside every durable process row.
824///
825/// `ProcessInput::Engine` keeps its payload opaque to core. Engines therefore
826/// publish their visible kind, display label, and definition identity at the
827/// registration boundary; list, summary, trigger, and observation paths read
828/// this durable field instead of decoding engine payload conventions.
829#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
830pub struct ProcessIdentity {
831    pub kind: String,
832    #[serde(default, skip_serializing_if = "Option::is_none")]
833    pub label: Option<String>,
834    #[serde(default, skip_serializing_if = "Option::is_none")]
835    pub definition: Option<serde_json::Value>,
836}
837
838impl ProcessIdentity {
839    pub fn new(kind: impl Into<String>) -> Self {
840        Self {
841            kind: kind.into(),
842            label: None,
843            definition: None,
844        }
845    }
846
847    pub fn with_label(mut self, label: Option<impl Into<String>>) -> Self {
848        self.label = label.map(Into::into);
849        self
850    }
851
852    pub fn with_definition(mut self, definition: Option<serde_json::Value>) -> Self {
853        self.definition = definition;
854        self
855    }
856
857    pub fn from_process_input(input: &ProcessInput) -> Self {
858        match input {
859            ProcessInput::ToolCall { call } => {
860                Self::new("tool").with_label(Some(call.tool_name.clone()))
861            }
862            ProcessInput::Engine { kind, .. } => Self::new(kind.clone()),
863            ProcessInput::SessionTurn { create_request, .. } => {
864                let label = create_request
865                    .subagent
866                    .as_ref()
867                    .map(|subagent| subagent.capability.clone())
868                    .or_else(|| create_request.usage_source.clone())
869                    .or_else(|| create_request.session_id.clone());
870                Self::new("session_turn").with_label(label)
871            }
872            ProcessInput::External { metadata } => {
873                let label = metadata
874                    .get("label")
875                    .or_else(|| metadata.get("name"))
876                    .or_else(|| metadata.get("title"))
877                    .and_then(serde_json::Value::as_str)
878                    .map(str::to_string);
879                Self::new("external").with_label(label)
880            }
881        }
882    }
883}
884
885/// Wire-format version stamped on every persisted [`ProcessLease`].
886///
887/// Bump when the on-wire shape of `ProcessLease` changes in a way that older
888/// code cannot safely deserialize.
889pub const PROCESS_LEASE_SCHEMA_VERSION: u32 = 1;
890
891/// Durable lease over a non-terminal background process.
892///
893/// The lease pair `(owner_id, lease_token)` plus `fencing_token` are how lash guarantees that
894/// one non-terminal process is re-executed by exactly one worker at a time —
895/// even after a crash, even across two workers that both sweep the same
896/// registry for recoverable work. The durable backend
897/// (`lash-sqlite-store`) uses these to serialize concurrent claims on the same
898/// `process_id`; future distributed durable backends use the *same* fields to
899/// coordinate workers that don't share a file system.
900///
901/// **This is not single-process theatre.** The owner / fencing-token /
902/// lease-token triple is the public contract that lets any backend detect and
903/// reject stale writers. Treat it as load-bearing, not defensive.
904#[derive(Clone, Debug, Serialize, Deserialize)]
905pub struct ProcessLease {
906    pub schema_version: u32,
907    pub process_id: ProcessId,
908    pub owner_id: String,
909    pub lease_token: String,
910    pub fencing_token: u64,
911    pub claimed_at_epoch_ms: u64,
912    pub expires_at_epoch_ms: u64,
913}
914
915#[derive(Clone, Debug, Serialize, Deserialize)]
916pub struct ProcessLeaseCompletion {
917    pub process_id: ProcessId,
918    pub lease_token: String,
919}
920
921impl ProcessLeaseCompletion {
922    pub fn from_lease(lease: &ProcessLease) -> Self {
923        Self {
924            process_id: lease.process_id.clone(),
925            lease_token: lease.lease_token.clone(),
926        }
927    }
928}
929
930/// Durable backend reference for background work accepted outside the local process.
931#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
932pub struct ProcessExternalRef {
933    pub backend: String,
934    pub id: String,
935    #[serde(default, skip_serializing_if = "Option::is_none")]
936    pub metadata: Option<serde_json::Value>,
937}
938
939#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
940pub struct ProcessHandleDescriptor {
941    #[serde(default, skip_serializing_if = "Option::is_none")]
942    pub kind: Option<String>,
943    #[serde(default, skip_serializing_if = "Option::is_none")]
944    pub label: Option<String>,
945}
946
947impl ProcessHandleDescriptor {
948    pub fn new(kind: Option<impl Into<String>>, label: Option<impl Into<String>>) -> Self {
949        Self {
950            kind: kind.map(Into::into),
951            label: label.map(Into::into),
952        }
953    }
954}
955
956#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
957pub struct ProcessHandleGrant {
958    pub session_id: String,
959    pub process_id: ProcessId,
960    pub descriptor: ProcessHandleDescriptor,
961}
962
963pub type ProcessHandleGrantEntry = (ProcessHandleGrant, ProcessRecord);
964
965#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
966#[serde(rename_all = "snake_case")]
967pub enum ProcessLifecycleStatus {
968    #[default]
969    Running,
970    Completed,
971    Failed,
972    Cancelled,
973}
974
975impl ProcessLifecycleStatus {
976    pub fn label(self) -> &'static str {
977        match self {
978            Self::Running => "running",
979            Self::Completed => "completed",
980            Self::Failed => "failed",
981            Self::Cancelled => "cancelled",
982        }
983    }
984
985    pub fn is_terminal(self) -> bool {
986        !matches!(self, Self::Running)
987    }
988
989    pub fn terminal_state(self) -> Option<ProcessTerminalState> {
990        match self {
991            Self::Running => None,
992            Self::Completed => Some(ProcessTerminalState::Completed),
993            Self::Failed => Some(ProcessTerminalState::Failed),
994            Self::Cancelled => Some(ProcessTerminalState::Cancelled),
995        }
996    }
997}
998
999impl From<&ProcessStatus> for ProcessLifecycleStatus {
1000    fn from(status: &ProcessStatus) -> Self {
1001        match status {
1002            ProcessStatus::Running => Self::Running,
1003            ProcessStatus::Completed { .. } => Self::Completed,
1004            ProcessStatus::Failed { .. } => Self::Failed,
1005            ProcessStatus::Cancelled { .. } => Self::Cancelled,
1006        }
1007    }
1008}
1009
1010impl From<ProcessStatus> for ProcessLifecycleStatus {
1011    fn from(status: ProcessStatus) -> Self {
1012        Self::from(&status)
1013    }
1014}
1015
1016#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1017pub struct ProcessHandleSummary {
1018    #[serde(rename = "__handle__")]
1019    pub handle_type: String,
1020    pub id: ProcessId,
1021    pub process_id: ProcessId,
1022    pub descriptor: ProcessHandleDescriptor,
1023    #[serde(default, skip_serializing_if = "Option::is_none")]
1024    pub definition: Option<serde_json::Value>,
1025    pub status: ProcessLifecycleStatus,
1026}
1027
1028impl ProcessHandleSummary {
1029    pub fn new(
1030        process_id: impl Into<ProcessId>,
1031        descriptor: ProcessHandleDescriptor,
1032        status: ProcessLifecycleStatus,
1033    ) -> Self {
1034        let process_id = process_id.into();
1035        Self {
1036            handle_type: "process".to_string(),
1037            id: process_id.clone(),
1038            process_id,
1039            descriptor,
1040            definition: None,
1041            status,
1042        }
1043    }
1044
1045    pub fn with_definition(mut self, definition: Option<serde_json::Value>) -> Self {
1046        self.definition = definition;
1047        self
1048    }
1049
1050    pub fn from_grant_record(grant: ProcessHandleGrant, record: ProcessRecord) -> Self {
1051        let definition = record.identity.definition.clone();
1052        Self::new(
1053            record.id,
1054            grant.descriptor,
1055            ProcessLifecycleStatus::from(record.status),
1056        )
1057        .with_definition(definition)
1058    }
1059}
1060
1061impl From<ProcessHandleGrantEntry> for ProcessHandleSummary {
1062    fn from((grant, record): ProcessHandleGrantEntry) -> Self {
1063        Self::from_grant_record(grant, record)
1064    }
1065}
1066
1067#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1068pub struct ProcessCancelSummary {
1069    pub process_id: ProcessId,
1070    pub status: ProcessLifecycleStatus,
1071}
1072
1073impl ProcessCancelSummary {
1074    pub fn from_record(record: ProcessRecord) -> Self {
1075        Self {
1076            process_id: record.id,
1077            status: ProcessLifecycleStatus::from(record.status),
1078        }
1079    }
1080}
1081
1082#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
1083pub enum ProcessStatusFilter {
1084    #[default]
1085    Running,
1086    Completed,
1087    Failed,
1088    Cancelled,
1089    Any,
1090}
1091
1092impl ProcessStatusFilter {
1093    pub fn decode(value: Option<&str>) -> Result<Self, String> {
1094        match value.unwrap_or("running") {
1095            "running" => Ok(Self::Running),
1096            "completed" => Ok(Self::Completed),
1097            "failed" => Ok(Self::Failed),
1098            "cancelled" => Ok(Self::Cancelled),
1099            "any" => Ok(Self::Any),
1100            other => Err(format!(
1101                "processes.list status must be `running`, `completed`, `failed`, `cancelled`, or `any`, got `{other}`"
1102            )),
1103        }
1104    }
1105
1106    pub fn list_mode(self) -> ProcessListMode {
1107        match self {
1108            Self::Running => ProcessListMode::Live,
1109            Self::Completed | Self::Failed | Self::Cancelled | Self::Any => ProcessListMode::All,
1110        }
1111    }
1112
1113    pub fn matches(self, status: ProcessLifecycleStatus) -> bool {
1114        match self {
1115            Self::Running => status == ProcessLifecycleStatus::Running,
1116            Self::Completed => status == ProcessLifecycleStatus::Completed,
1117            Self::Failed => status == ProcessLifecycleStatus::Failed,
1118            Self::Cancelled => status == ProcessLifecycleStatus::Cancelled,
1119            Self::Any => true,
1120        }
1121    }
1122}
1123
1124#[derive(Clone, Debug, Default, PartialEq)]
1125pub struct ProcessListFilter {
1126    pub definition: Option<serde_json::Value>,
1127    pub status: ProcessStatusFilter,
1128    pub waiting: Option<bool>,
1129}
1130
1131impl ProcessListFilter {
1132    pub fn decode(args: &serde_json::Value) -> Result<Self, String> {
1133        let map = args
1134            .as_object()
1135            .ok_or_else(|| "processes.list expects a record of process filters".to_string())?;
1136        for key in map.keys() {
1137            match key.as_str() {
1138                "definition" | "status" | "waiting" => {}
1139                _ => return Err(format!("processes.list unknown filter `{key}`")),
1140            }
1141        }
1142        let definition = args.get("definition").cloned();
1143        let status =
1144            ProcessStatusFilter::decode(args.get("status").and_then(serde_json::Value::as_str))?;
1145        let waiting = args
1146            .get("waiting")
1147            .map(|value| {
1148                value
1149                    .as_bool()
1150                    .ok_or_else(|| "processes.list `waiting` filter must be a boolean".to_string())
1151            })
1152            .transpose()?;
1153        Ok(Self {
1154            definition,
1155            status,
1156            waiting,
1157        })
1158    }
1159
1160    pub fn list_mode(&self) -> ProcessListMode {
1161        self.status.list_mode()
1162    }
1163
1164    pub fn matches_entry(&self, entry: &ProcessHandleGrantEntry) -> bool {
1165        let (_grant, record) = entry;
1166        self.matches_record(record)
1167    }
1168
1169    pub fn matches_record(&self, record: &ProcessRecord) -> bool {
1170        let status = ProcessLifecycleStatus::from(&record.status);
1171        self.status.matches(status)
1172            && self
1173                .definition
1174                .as_ref()
1175                .is_none_or(|definition| record.identity.definition.as_ref() == Some(definition))
1176            && self
1177                .waiting
1178                .is_none_or(|waiting| record.wait.is_some() == waiting)
1179    }
1180}
1181
1182#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1183#[serde(rename_all = "snake_case")]
1184pub enum ProcessListMode {
1185    #[default]
1186    Live,
1187    All,
1188}
1189
1190impl ProcessListMode {
1191    pub fn as_str(self) -> &'static str {
1192        match self {
1193            Self::Live => "live",
1194            Self::All => "all",
1195        }
1196    }
1197}
1198
1199#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1200pub struct ProcessStartGrant {
1201    pub session_scope: SessionScope,
1202    pub descriptor: ProcessHandleDescriptor,
1203}
1204
1205#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
1206pub struct ProcessSessionDeleteReport {
1207    pub session_id: String,
1208    pub revoked_handle_count: usize,
1209    pub deleted_wake_count: usize,
1210    pub orphaned_process_ids: Vec<String>,
1211    pub preserved_process_ids: Vec<String>,
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216    use serde_json::json;
1217
1218    use super::*;
1219
1220    fn process_value(component: &str, pos: usize, name: &str) -> serde_json::Value {
1221        json!({
1222            "component": component,
1223            "pos": pos,
1224            "name": name,
1225        })
1226    }
1227
1228    fn engine_entry(
1229        process_id: &str,
1230        definition: serde_json::Value,
1231        process_name: &str,
1232        status: ProcessStatus,
1233    ) -> ProcessHandleGrantEntry {
1234        let mut record = ProcessRecord::from_registration(
1235            ProcessRegistration::new(
1236                process_id,
1237                ProcessInput::Engine {
1238                    kind: "test-engine".to_string(),
1239                    payload: json!({
1240                        "definition": definition.clone(),
1241                        "label": process_name,
1242                    }),
1243                },
1244                ProcessProvenance::host(),
1245            )
1246            .with_identity(
1247                ProcessIdentity::new("test-engine")
1248                    .with_label(Some(process_name))
1249                    .with_definition(Some(definition)),
1250            )
1251            .with_execution_env_ref(Some(ProcessExecutionEnvRef::new(format!(
1252                "process-env:test:{process_id}"
1253            )))),
1254        );
1255        record.status = status;
1256        (
1257            ProcessHandleGrant {
1258                session_id: "session".to_string(),
1259                process_id: process_id.to_string(),
1260                descriptor: ProcessHandleDescriptor::new(Some("test-engine"), Some(process_name)),
1261            },
1262            record,
1263        )
1264    }
1265
1266    #[test]
1267    fn process_list_filter_matches_definition_and_status() {
1268        let target_ref = process_value("target", 0, "target");
1269        let other_ref = process_value("other", 1, "other");
1270        let filter = ProcessListFilter::decode(&json!({
1271            "definition": target_ref,
1272            "status": "completed"
1273        }))
1274        .expect("decode filter");
1275
1276        let matching = engine_entry(
1277            "matching",
1278            target_ref,
1279            "target",
1280            ProcessStatus::Completed {
1281                await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1282                    json!(true),
1283                )),
1284            },
1285        );
1286        let wrong_definition = engine_entry(
1287            "wrong-definition",
1288            other_ref,
1289            "other",
1290            ProcessStatus::Completed {
1291                await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1292                    json!(true),
1293                )),
1294            },
1295        );
1296
1297        assert_eq!(filter.list_mode(), ProcessListMode::All);
1298        assert!(filter.matches_entry(&matching));
1299        assert!(!filter.matches_entry(&wrong_definition));
1300    }
1301
1302    #[test]
1303    fn process_list_filter_matches_waiting_facet() {
1304        let process_ref = process_value("target", 0, "target");
1305        let mut waiting_entry = engine_entry(
1306            "waiting",
1307            process_ref.clone(),
1308            "target",
1309            ProcessStatus::Running,
1310        );
1311        waiting_entry.1.wait = Some(WaitState {
1312            since_ms: 42,
1313            kind: WaitKind::Signal {
1314                name: "ready".to_string(),
1315                event_type: "signal.ready".to_string(),
1316                key: "process:waiting:signal.ready:1".to_string(),
1317                ordinal: 1,
1318            },
1319        });
1320        let idle_entry = engine_entry("idle", process_ref, "target", ProcessStatus::Running);
1321        let waiting_filter =
1322            ProcessListFilter::decode(&json!({ "waiting": true })).expect("decode waiting filter");
1323        let idle_filter =
1324            ProcessListFilter::decode(&json!({ "waiting": false })).expect("decode idle filter");
1325
1326        assert_eq!(waiting_filter.list_mode(), ProcessListMode::Live);
1327        assert!(waiting_filter.matches_entry(&waiting_entry));
1328        assert!(!waiting_filter.matches_entry(&idle_entry));
1329        assert!(!idle_filter.matches_entry(&waiting_entry));
1330        assert!(idle_filter.matches_entry(&idle_entry));
1331        assert!(
1332            ProcessListFilter::decode(&json!({ "waiting": "yes" }))
1333                .expect_err("invalid waiting filter")
1334                .contains("must be a boolean")
1335        );
1336    }
1337}