Skip to main content

lash_core/runtime/process/
model.rs

1use std::fmt;
2use std::sync::Arc;
3
4use serde::{Deserialize, Serialize};
5
6use super::events::{
7    ProcessAwaitOutput, ProcessEventType, ProcessTerminalSemantics, ProcessTerminalState,
8    default_process_event_types,
9};
10use super::time::current_epoch_ms;
11use super::validation::{
12    ensure_core_event_types, process_registration_hash, validate_process_registration,
13};
14
15pub type ProcessId = String;
16
17#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
18#[serde(transparent)]
19pub struct ProcessScopeId(String);
20
21impl ProcessScopeId {
22    pub fn new(value: impl Into<String>) -> Self {
23        Self(value.into())
24    }
25
26    pub fn as_str(&self) -> &str {
27        &self.0
28    }
29}
30
31impl fmt::Display for ProcessScopeId {
32    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
33        formatter.write_str(&self.0)
34    }
35}
36
37impl From<String> for ProcessScopeId {
38    fn from(value: String) -> Self {
39        Self::new(value)
40    }
41}
42
43impl From<&str> for ProcessScopeId {
44    fn from(value: &str) -> Self {
45        Self::new(value)
46    }
47}
48
49/// Durable executable input for a process.
50#[derive(Debug, Serialize, Deserialize)]
51#[serde(tag = "type", rename_all = "snake_case")]
52pub enum ProcessInput {
53    ToolCall {
54        call: crate::PreparedToolCall,
55    },
56    LashlangProcess {
57        module_ref: lashlang::ModuleRef,
58        process_ref: lashlang::ProcessRef,
59        required_surface_ref: lashlang::RequiredSurfaceRef,
60        process_name: String,
61        #[serde(default)]
62        args: serde_json::Map<String, serde_json::Value>,
63    },
64    SessionTurn {
65        create_request: Box<crate::SessionCreateRequest>,
66        turn_input: Box<crate::TurnInput>,
67        output_contract: crate::ToolOutputContract,
68    },
69    External {
70        #[serde(default)]
71        metadata: serde_json::Value,
72    },
73}
74
75impl Clone for ProcessInput {
76    fn clone(&self) -> Self {
77        match self {
78            Self::ToolCall { call } => Self::ToolCall { call: call.clone() },
79            Self::LashlangProcess {
80                module_ref,
81                process_ref,
82                required_surface_ref,
83                process_name,
84                args,
85            } => Self::LashlangProcess {
86                module_ref: module_ref.clone(),
87                process_ref: process_ref.clone(),
88                required_surface_ref: required_surface_ref.clone(),
89                process_name: process_name.clone(),
90                args: args.clone(),
91            },
92            Self::SessionTurn {
93                create_request,
94                turn_input,
95                output_contract,
96            } => Self::SessionTurn {
97                create_request: create_request.clone(),
98                turn_input: turn_input.clone(),
99                output_contract: output_contract.clone(),
100            },
101            Self::External { metadata } => Self::External {
102                metadata: metadata.clone(),
103            },
104        }
105    }
106}
107
108/// Execution-local context for a process start effect. This is not part of the
109/// durable process row.
110#[derive(Clone, Debug, Default, Serialize, Deserialize)]
111pub struct ProcessExecutionContext {
112    #[serde(default, skip_serializing_if = "Option::is_none")]
113    pub causal_invocation: Option<crate::RuntimeInvocation>,
114    #[serde(default, skip_serializing_if = "Option::is_none")]
115    pub wake_target_scope: Option<ProcessScope>,
116}
117
118impl ProcessExecutionContext {
119    pub fn with_causal_invocation(mut self, invocation: Option<crate::RuntimeInvocation>) -> Self {
120        self.causal_invocation = invocation;
121        self
122    }
123
124    pub fn with_wake_target_scope(mut self, scope: ProcessScope) -> Self {
125        self.wake_target_scope = Some(scope);
126        self
127    }
128
129    pub fn is_empty(&self) -> bool {
130        self.causal_invocation.is_none() && self.wake_target_scope.is_none()
131    }
132}
133
134#[derive(Clone)]
135pub struct ProcessOpScope<'scope> {
136    pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
137    pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'scope>,
138    pub(crate) agent_frame_id: Option<crate::AgentFrameId>,
139    pub(crate) target_agent_frame_id: Option<crate::AgentFrameId>,
140}
141
142impl<'scope> ProcessOpScope<'scope> {
143    pub fn new(scoped_effect_controller: crate::ScopedEffectController<'scope>) -> Self {
144        Self {
145            parent_invocation: None,
146            effect_controller: crate::runtime::RuntimeEffectControllerHandle::borrowed(
147                scoped_effect_controller,
148            ),
149            agent_frame_id: None,
150            target_agent_frame_id: None,
151        }
152    }
153
154    pub fn with_parent_invocation(
155        mut self,
156        parent_invocation: Option<crate::RuntimeInvocation>,
157    ) -> Self {
158        self.parent_invocation = parent_invocation;
159        self
160    }
161
162    pub fn with_agent_frame_id(mut self, agent_frame_id: Option<crate::AgentFrameId>) -> Self {
163        self.agent_frame_id = agent_frame_id;
164        self
165    }
166
167    pub fn with_target_agent_frame_id(
168        mut self,
169        agent_frame_id: Option<crate::AgentFrameId>,
170    ) -> Self {
171        self.target_agent_frame_id = agent_frame_id;
172        self
173    }
174
175    pub fn agent_frame_id(&self) -> Option<&str> {
176        self.agent_frame_id.as_deref()
177    }
178
179    pub fn target_agent_frame_id(&self) -> Option<&str> {
180        self.target_agent_frame_id.as_deref()
181    }
182
183    pub(crate) fn controller(&self) -> &dyn crate::RuntimeEffectController {
184        self.effect_controller.controller()
185    }
186}
187
188#[derive(Clone, Debug, Default)]
189pub struct ProcessStartOptions {
190    pub descriptor: Option<ProcessHandleDescriptor>,
191}
192
193impl ProcessStartOptions {
194    pub fn new() -> Self {
195        Self::default()
196    }
197
198    pub fn with_descriptor(mut self, descriptor: ProcessHandleDescriptor) -> Self {
199        self.descriptor = Some(descriptor);
200        self
201    }
202
203    pub fn with_optional_descriptor(mut self, descriptor: Option<ProcessHandleDescriptor>) -> Self {
204        self.descriptor = descriptor;
205        self
206    }
207
208    pub fn execution_context(&self, scope: &ProcessOpScope<'_>) -> ProcessExecutionContext {
209        ProcessExecutionContext {
210            causal_invocation: scope.parent_invocation.clone(),
211            wake_target_scope: None,
212        }
213    }
214}
215
216/// Public host-facing request for starting a visible process handle.
217#[derive(Clone, Debug, Serialize, Deserialize)]
218pub struct ProcessStartRequest {
219    pub id: ProcessId,
220    pub input: ProcessInput,
221    pub descriptor: ProcessHandleDescriptor,
222    #[serde(default)]
223    pub event_types: Vec<ProcessEventType>,
224}
225
226impl ProcessStartRequest {
227    pub fn new(
228        id: impl Into<ProcessId>,
229        input: ProcessInput,
230        descriptor: ProcessHandleDescriptor,
231    ) -> Self {
232        Self {
233            id: id.into(),
234            input,
235            descriptor,
236            event_types: default_process_event_types(),
237        }
238    }
239
240    pub fn external(
241        id: impl Into<ProcessId>,
242        descriptor: ProcessHandleDescriptor,
243        metadata: serde_json::Value,
244    ) -> Self {
245        Self::new(id, ProcessInput::External { metadata }, descriptor)
246    }
247
248    pub fn with_event_types(
249        mut self,
250        event_types: impl IntoIterator<Item = ProcessEventType>,
251    ) -> Self {
252        self.event_types = event_types.into_iter().collect();
253        self
254    }
255
256    pub fn with_extra_event_types(
257        mut self,
258        event_types: impl IntoIterator<Item = ProcessEventType>,
259    ) -> Self {
260        self.event_types.extend(event_types);
261        self
262    }
263
264    pub(crate) fn into_registration_and_options(
265        self,
266    ) -> (
267        ProcessRegistration,
268        ProcessStartOptions,
269        ProcessHandleDescriptor,
270    ) {
271        let descriptor = self.descriptor;
272        let registration =
273            ProcessRegistration::new(self.id, self.input).with_event_types(self.event_types);
274        let options = ProcessStartOptions::new().with_descriptor(descriptor.clone());
275        (registration, options, descriptor)
276    }
277}
278
279#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
280pub struct ProcessScope {
281    pub session_id: String,
282    #[serde(default, skip_serializing_if = "Option::is_none")]
283    pub agent_frame_id: Option<crate::AgentFrameId>,
284}
285
286#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
287pub struct ProcessProvenance {
288    pub owner_scope: ProcessScope,
289    pub host_profile_id: String,
290    #[serde(default, skip_serializing_if = "Option::is_none")]
291    pub caused_by: Option<crate::CausalRef>,
292}
293
294impl ProcessProvenance {
295    pub fn new(owner_scope: ProcessScope, host_profile_id: impl Into<String>) -> Self {
296        Self {
297            owner_scope,
298            host_profile_id: host_profile_id.into(),
299            caused_by: None,
300        }
301    }
302
303    pub fn with_caused_by(mut self, caused_by: Option<crate::CausalRef>) -> Self {
304        self.caused_by = caused_by;
305        self
306    }
307}
308
309impl ProcessScope {
310    pub fn new(session_id: impl Into<String>) -> Self {
311        Self {
312            session_id: session_id.into(),
313            agent_frame_id: None,
314        }
315    }
316
317    pub fn for_agent_frame(
318        session_id: impl Into<String>,
319        agent_frame_id: impl Into<crate::AgentFrameId>,
320    ) -> Self {
321        Self {
322            session_id: session_id.into(),
323            agent_frame_id: Some(agent_frame_id.into()),
324        }
325    }
326
327    pub fn id(&self) -> ProcessScopeId {
328        match self.agent_frame_id.as_deref() {
329            Some(frame_id) if !frame_id.is_empty() => {
330                ProcessScopeId::new(format!("session:{}/frame:{frame_id}", self.session_id))
331            }
332            _ => ProcessScopeId::new(format!("session:{}", self.session_id)),
333        }
334    }
335
336    pub fn is_empty(&self) -> bool {
337        self.session_id.is_empty()
338    }
339}
340
341/// Serializable process spec used to start or recover a runtime process.
342#[derive(Debug, Serialize, Deserialize)]
343pub struct ProcessRegistration {
344    pub id: ProcessId,
345    pub input: Arc<ProcessInput>,
346    #[serde(default)]
347    pub event_types: Vec<ProcessEventType>,
348    pub provenance: ProcessProvenance,
349}
350
351impl Clone for ProcessRegistration {
352    fn clone(&self) -> Self {
353        Self {
354            id: self.id.clone(),
355            input: Arc::clone(&self.input),
356            event_types: self.event_types.clone(),
357            provenance: self.provenance.clone(),
358        }
359    }
360}
361
362impl ProcessRegistration {
363    pub fn new(id: impl Into<ProcessId>, input: ProcessInput) -> Self {
364        Self {
365            id: id.into(),
366            input: Arc::new(input),
367            event_types: default_process_event_types(),
368            provenance: ProcessProvenance::new(ProcessScope::new("root"), "default"),
369        }
370    }
371
372    pub fn with_process_provenance(mut self, provenance: ProcessProvenance) -> Self {
373        self.provenance = provenance;
374        self
375    }
376
377    pub fn with_event_types(
378        mut self,
379        event_types: impl IntoIterator<Item = ProcessEventType>,
380    ) -> Self {
381        self.event_types = event_types.into_iter().collect();
382        self
383    }
384
385    pub fn with_extra_event_types(
386        mut self,
387        event_types: impl IntoIterator<Item = ProcessEventType>,
388    ) -> Self {
389        self.event_types.extend(event_types);
390        self
391    }
392}
393
394#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
395#[serde(tag = "state", rename_all = "snake_case")]
396pub enum ProcessStatus {
397    #[default]
398    Running,
399    Completed {
400        await_output: ProcessAwaitOutput,
401    },
402    Failed {
403        await_output: ProcessAwaitOutput,
404    },
405    Cancelled {
406        await_output: ProcessAwaitOutput,
407    },
408}
409
410impl ProcessStatus {
411    pub fn from_terminal(terminal: ProcessTerminalSemantics) -> Self {
412        match terminal.state {
413            ProcessTerminalState::Completed => Self::Completed {
414                await_output: terminal.await_output,
415            },
416            ProcessTerminalState::Failed => Self::Failed {
417                await_output: terminal.await_output,
418            },
419            ProcessTerminalState::Cancelled => Self::Cancelled {
420                await_output: terminal.await_output,
421            },
422        }
423    }
424
425    pub fn is_terminal(&self) -> bool {
426        !matches!(self, Self::Running)
427    }
428
429    pub fn label(&self) -> &'static str {
430        match self {
431            Self::Running => "running",
432            Self::Completed { .. } => "completed",
433            Self::Failed { .. } => "failed",
434            Self::Cancelled { .. } => "cancelled",
435        }
436    }
437
438    pub fn terminal_state(&self) -> Option<ProcessTerminalState> {
439        match self {
440            Self::Running => None,
441            Self::Completed { .. } => Some(ProcessTerminalState::Completed),
442            Self::Failed { .. } => Some(ProcessTerminalState::Failed),
443            Self::Cancelled { .. } => Some(ProcessTerminalState::Cancelled),
444        }
445    }
446
447    pub fn await_output(&self) -> Option<&ProcessAwaitOutput> {
448        match self {
449            Self::Running => None,
450            Self::Completed { await_output }
451            | Self::Failed { await_output }
452            | Self::Cancelled { await_output } => Some(await_output),
453        }
454    }
455
456    pub fn terminal_semantics(&self) -> Option<ProcessTerminalSemantics> {
457        Some(ProcessTerminalSemantics {
458            state: self.terminal_state()?,
459            await_output: self.await_output()?.clone(),
460        })
461    }
462}
463
464/// Durable process row. Session-visible addressability lives in
465/// [`ProcessHandleGrant`], not in the process record.
466#[derive(Clone, Debug, Serialize, Deserialize)]
467pub struct ProcessRecord {
468    pub id: ProcessId,
469    pub registration_hash: String,
470    pub input: Arc<ProcessInput>,
471    #[serde(default)]
472    pub event_types: Vec<ProcessEventType>,
473    pub provenance: ProcessProvenance,
474    #[serde(default)]
475    pub created_at_ms: u64,
476    #[serde(default)]
477    pub updated_at_ms: u64,
478    #[serde(default, skip_serializing_if = "Option::is_none")]
479    pub external_ref: Option<ProcessExternalRef>,
480    #[serde(default)]
481    pub status: ProcessStatus,
482}
483
484impl ProcessRecord {
485    pub fn from_registration(mut registration: ProcessRegistration) -> Self {
486        ensure_core_event_types(&mut registration);
487        validate_process_registration(&registration)
488            .expect("process registration should be valid before record construction");
489        let registration_hash = process_registration_hash(&registration)
490            .expect("process registration should hash before record construction");
491        Self::from_prepared_registration(registration, registration_hash, current_epoch_ms())
492    }
493
494    pub fn from_prepared_registration(
495        registration: ProcessRegistration,
496        registration_hash: String,
497        now_ms: u64,
498    ) -> Self {
499        Self {
500            id: registration.id,
501            registration_hash,
502            input: registration.input,
503            event_types: registration.event_types,
504            provenance: registration.provenance,
505            created_at_ms: now_ms,
506            updated_at_ms: now_ms,
507            external_ref: None,
508            status: ProcessStatus::Running,
509        }
510    }
511
512    pub fn is_terminal(&self) -> bool {
513        self.status.is_terminal()
514    }
515
516    pub fn owner_scope_id(&self) -> ProcessScopeId {
517        self.provenance.owner_scope.id()
518    }
519
520    pub fn host_profile_id(&self) -> &str {
521        &self.provenance.host_profile_id
522    }
523}
524
525/// Wire-format version stamped on every persisted [`ProcessLease`].
526///
527/// Bump when the on-wire shape of `ProcessLease` changes in a way that older
528/// code cannot safely deserialize.
529pub const PROCESS_LEASE_SCHEMA_VERSION: u32 = 1;
530
531/// Durable lease over a non-terminal background process.
532///
533/// The lease pair `(owner_id, lease_token)` plus `fencing_token` are how lash guarantees that
534/// one non-terminal process is re-executed by exactly one worker at a time —
535/// even after a crash, even across two workers that both sweep the same
536/// registry for recoverable work. The durable backend
537/// (`lash-sqlite-store`) uses these to serialize concurrent claims on the same
538/// `process_id`; future distributed durable backends use the *same* fields to
539/// coordinate workers that don't share a file system.
540///
541/// **This is not single-process theatre.** The owner / fencing-token /
542/// lease-token triple is the public contract that lets any backend detect and
543/// reject stale writers. Treat it as load-bearing, not defensive.
544#[derive(Clone, Debug, Serialize, Deserialize)]
545pub struct ProcessLease {
546    pub schema_version: u32,
547    pub process_id: ProcessId,
548    pub owner_id: String,
549    pub lease_token: String,
550    pub fencing_token: u64,
551    pub claimed_at_epoch_ms: u64,
552    pub expires_at_epoch_ms: u64,
553}
554
555#[derive(Clone, Debug, Serialize, Deserialize)]
556pub struct ProcessLeaseCompletion {
557    pub process_id: ProcessId,
558    pub lease_token: String,
559}
560
561impl ProcessLeaseCompletion {
562    pub fn from_lease(lease: &ProcessLease) -> Self {
563        Self {
564            process_id: lease.process_id.clone(),
565            lease_token: lease.lease_token.clone(),
566        }
567    }
568}
569
570/// Durable backend reference for background work accepted outside the local process.
571#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
572pub struct ProcessExternalRef {
573    pub backend: String,
574    pub id: String,
575    #[serde(default, skip_serializing_if = "Option::is_none")]
576    pub metadata: Option<serde_json::Value>,
577}
578
579#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
580pub struct ProcessHandleDescriptor {
581    #[serde(default, skip_serializing_if = "Option::is_none")]
582    pub kind: Option<String>,
583    #[serde(default, skip_serializing_if = "Option::is_none")]
584    pub label: Option<String>,
585}
586
587impl ProcessHandleDescriptor {
588    pub fn new(kind: Option<impl Into<String>>, label: Option<impl Into<String>>) -> Self {
589        Self {
590            kind: kind.map(Into::into),
591            label: label.map(Into::into),
592        }
593    }
594}
595
596#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
597pub struct ProcessHandleGrant {
598    pub session_id: String,
599    pub process_id: ProcessId,
600    pub descriptor: ProcessHandleDescriptor,
601}
602
603pub type ProcessHandleGrantEntry = (ProcessHandleGrant, ProcessRecord);
604
605#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
606#[serde(rename_all = "snake_case")]
607pub enum ProcessLifecycleStatus {
608    #[default]
609    Running,
610    Completed,
611    Failed,
612    Cancelled,
613}
614
615impl ProcessLifecycleStatus {
616    pub fn label(self) -> &'static str {
617        match self {
618            Self::Running => "running",
619            Self::Completed => "completed",
620            Self::Failed => "failed",
621            Self::Cancelled => "cancelled",
622        }
623    }
624
625    pub fn is_terminal(self) -> bool {
626        !matches!(self, Self::Running)
627    }
628
629    pub fn terminal_state(self) -> Option<ProcessTerminalState> {
630        match self {
631            Self::Running => None,
632            Self::Completed => Some(ProcessTerminalState::Completed),
633            Self::Failed => Some(ProcessTerminalState::Failed),
634            Self::Cancelled => Some(ProcessTerminalState::Cancelled),
635        }
636    }
637}
638
639impl From<&ProcessStatus> for ProcessLifecycleStatus {
640    fn from(status: &ProcessStatus) -> Self {
641        match status {
642            ProcessStatus::Running => Self::Running,
643            ProcessStatus::Completed { .. } => Self::Completed,
644            ProcessStatus::Failed { .. } => Self::Failed,
645            ProcessStatus::Cancelled { .. } => Self::Cancelled,
646        }
647    }
648}
649
650impl From<ProcessStatus> for ProcessLifecycleStatus {
651    fn from(status: ProcessStatus) -> Self {
652        Self::from(&status)
653    }
654}
655
656#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
657pub struct ProcessHandleSummary {
658    #[serde(rename = "__handle__")]
659    pub handle_type: String,
660    pub id: ProcessId,
661    pub process_id: ProcessId,
662    pub descriptor: ProcessHandleDescriptor,
663    #[serde(default, skip_serializing_if = "Option::is_none")]
664    pub definition: Option<ProcessDefinitionSummary>,
665    pub status: ProcessLifecycleStatus,
666}
667
668impl ProcessHandleSummary {
669    pub fn new(
670        process_id: impl Into<ProcessId>,
671        descriptor: ProcessHandleDescriptor,
672        status: ProcessLifecycleStatus,
673    ) -> Self {
674        let process_id = process_id.into();
675        Self {
676            handle_type: "process".to_string(),
677            id: process_id.clone(),
678            process_id,
679            descriptor,
680            definition: None,
681            status,
682        }
683    }
684
685    pub fn with_definition(mut self, definition: Option<ProcessDefinitionSummary>) -> Self {
686        self.definition = definition;
687        self
688    }
689
690    pub fn from_grant_record(grant: ProcessHandleGrant, record: ProcessRecord) -> Self {
691        let definition = ProcessDefinitionSummary::from_input(record.input.as_ref());
692        Self::new(
693            record.id,
694            grant.descriptor,
695            ProcessLifecycleStatus::from(record.status),
696        )
697        .with_definition(definition)
698    }
699}
700
701impl From<ProcessHandleGrantEntry> for ProcessHandleSummary {
702    fn from((grant, record): ProcessHandleGrantEntry) -> Self {
703        Self::from_grant_record(grant, record)
704    }
705}
706
707#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
708pub struct ProcessCancelSummary {
709    pub process_id: ProcessId,
710    pub status: ProcessLifecycleStatus,
711}
712
713impl ProcessCancelSummary {
714    pub fn from_record(record: ProcessRecord) -> Self {
715        Self {
716            process_id: record.id,
717            status: ProcessLifecycleStatus::from(record.status),
718        }
719    }
720}
721
722#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
723pub struct ProcessDefinitionSummary {
724    pub name: String,
725}
726
727impl ProcessDefinitionSummary {
728    pub fn from_input(input: &ProcessInput) -> Option<Self> {
729        match input {
730            ProcessInput::LashlangProcess { process_name, .. } => Some(Self {
731                name: process_name.clone(),
732            }),
733            ProcessInput::ToolCall { .. }
734            | ProcessInput::SessionTurn { .. }
735            | ProcessInput::External { .. } => None,
736        }
737    }
738}
739
740#[derive(Clone, Debug, PartialEq, Eq)]
741pub struct ProcessDefinitionSelector {
742    module_ref: lashlang::ModuleRef,
743    required_surface_ref: lashlang::RequiredSurfaceRef,
744    process_ref: lashlang::ProcessRef,
745    process_name: String,
746}
747
748impl ProcessDefinitionSelector {
749    pub fn decode(value: &serde_json::Value) -> Result<Self, String> {
750        if value
751            .get(lashlang::LASH_PROCESS_VALUE_KEY)
752            .and_then(serde_json::Value::as_bool)
753            != Some(true)
754        {
755            return Err("definition must be a process definition value".to_string());
756        }
757        Ok(Self {
758            module_ref: decode_process_definition_field(
759                value,
760                lashlang::LASH_MODULE_REF_KEY,
761                "definition",
762            )?,
763            required_surface_ref: decode_process_definition_field(
764                value,
765                lashlang::LASH_REQUIRED_SURFACE_REF_KEY,
766                "definition",
767            )?,
768            process_ref: decode_process_definition_field(
769                value,
770                lashlang::LASH_PROCESS_REF_KEY,
771                "definition",
772            )?,
773            process_name: value
774                .get(lashlang::LASH_PROCESS_NAME_KEY)
775                .and_then(serde_json::Value::as_str)
776                .ok_or_else(|| "definition is missing its process name".to_string())?
777                .to_string(),
778        })
779    }
780
781    pub fn matches_input(&self, input: &ProcessInput) -> bool {
782        match input {
783            ProcessInput::LashlangProcess {
784                module_ref,
785                process_ref,
786                required_surface_ref,
787                process_name,
788                ..
789            } => {
790                self.module_ref == *module_ref
791                    && self.required_surface_ref == *required_surface_ref
792                    && self.process_ref == *process_ref
793                    && self.process_name == *process_name
794            }
795            ProcessInput::ToolCall { .. }
796            | ProcessInput::SessionTurn { .. }
797            | ProcessInput::External { .. } => false,
798        }
799    }
800}
801
802fn decode_process_definition_field<T: serde::de::DeserializeOwned>(
803    value: &serde_json::Value,
804    field: &'static str,
805    label: &'static str,
806) -> Result<T, String> {
807    serde_json::from_value(
808        value
809            .get(field)
810            .cloned()
811            .ok_or_else(|| format!("{label} is missing {field}"))?,
812    )
813    .map_err(|err| format!("{label} has invalid {field}: {err}"))
814}
815
816#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
817pub enum ProcessStatusFilter {
818    #[default]
819    Running,
820    Completed,
821    Failed,
822    Cancelled,
823    Any,
824}
825
826impl ProcessStatusFilter {
827    pub fn decode(value: Option<&str>) -> Result<Self, String> {
828        match value.unwrap_or("running") {
829            "running" => Ok(Self::Running),
830            "completed" => Ok(Self::Completed),
831            "failed" => Ok(Self::Failed),
832            "cancelled" => Ok(Self::Cancelled),
833            "any" => Ok(Self::Any),
834            other => Err(format!(
835                "processes.list status must be `running`, `completed`, `failed`, `cancelled`, or `any`, got `{other}`"
836            )),
837        }
838    }
839
840    pub fn list_mode(self) -> ProcessListMode {
841        match self {
842            Self::Running => ProcessListMode::Live,
843            Self::Completed | Self::Failed | Self::Cancelled | Self::Any => ProcessListMode::All,
844        }
845    }
846
847    pub fn matches(self, status: ProcessLifecycleStatus) -> bool {
848        match self {
849            Self::Running => status == ProcessLifecycleStatus::Running,
850            Self::Completed => status == ProcessLifecycleStatus::Completed,
851            Self::Failed => status == ProcessLifecycleStatus::Failed,
852            Self::Cancelled => status == ProcessLifecycleStatus::Cancelled,
853            Self::Any => true,
854        }
855    }
856}
857
858#[derive(Clone, Debug, Default, PartialEq, Eq)]
859pub struct ProcessListFilter {
860    pub definition: Option<ProcessDefinitionSelector>,
861    pub status: ProcessStatusFilter,
862}
863
864impl ProcessListFilter {
865    pub fn decode(args: &serde_json::Value) -> Result<Self, String> {
866        let map = args
867            .as_object()
868            .ok_or_else(|| "processes.list expects a record of process filters".to_string())?;
869        for key in map.keys() {
870            match key.as_str() {
871                "definition" | "status" => {}
872                _ => return Err(format!("processes.list unknown filter `{key}`")),
873            }
874        }
875        let definition = args
876            .get("definition")
877            .map(ProcessDefinitionSelector::decode)
878            .transpose()?;
879        let status =
880            ProcessStatusFilter::decode(args.get("status").and_then(serde_json::Value::as_str))?;
881        Ok(Self { definition, status })
882    }
883
884    pub fn list_mode(&self) -> ProcessListMode {
885        self.status.list_mode()
886    }
887
888    pub fn matches_entry(&self, entry: &ProcessHandleGrantEntry) -> bool {
889        let (_grant, record) = entry;
890        let status = ProcessLifecycleStatus::from(&record.status);
891        self.status.matches(status)
892            && self
893                .definition
894                .as_ref()
895                .is_none_or(|definition| definition.matches_input(record.input.as_ref()))
896    }
897}
898
899#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
900#[serde(rename_all = "snake_case")]
901pub enum ProcessListMode {
902    #[default]
903    Live,
904    All,
905}
906
907impl ProcessListMode {
908    pub fn as_str(self) -> &'static str {
909        match self {
910            Self::Live => "live",
911            Self::All => "all",
912        }
913    }
914}
915
916#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
917pub struct ProcessStartGrant {
918    pub owner_scope: ProcessScope,
919    pub descriptor: ProcessHandleDescriptor,
920}
921
922#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
923pub struct ProcessSessionDeleteReport {
924    pub session_id: String,
925    pub revoked_handle_count: usize,
926    pub deleted_wake_count: usize,
927    pub cancel_process_ids: Vec<String>,
928    pub preserved_process_ids: Vec<String>,
929}
930
931#[cfg(test)]
932mod tests {
933    use serde_json::json;
934
935    use super::*;
936
937    fn process_ref(component: &str, pos: usize) -> lashlang::ProcessRef {
938        lashlang::ProcessRef {
939            component: lashlang::ContentHash::new(component),
940            pos: pos as u32,
941        }
942    }
943
944    fn process_value(
945        module_ref: &lashlang::ModuleRef,
946        surface_ref: &lashlang::RequiredSurfaceRef,
947        process_ref: &lashlang::ProcessRef,
948        name: &str,
949    ) -> serde_json::Value {
950        let mut value = serde_json::Map::new();
951        value.insert(lashlang::LASH_PROCESS_VALUE_KEY.to_string(), json!(true));
952        value.insert(lashlang::LASH_MODULE_REF_KEY.to_string(), json!(module_ref));
953        value.insert(
954            lashlang::LASH_REQUIRED_SURFACE_REF_KEY.to_string(),
955            json!(surface_ref),
956        );
957        value.insert(
958            lashlang::LASH_PROCESS_REF_KEY.to_string(),
959            json!(process_ref),
960        );
961        value.insert(lashlang::LASH_PROCESS_NAME_KEY.to_string(), json!(name));
962        serde_json::Value::Object(value)
963    }
964
965    fn lashlang_entry(
966        process_id: &str,
967        module_ref: lashlang::ModuleRef,
968        surface_ref: lashlang::RequiredSurfaceRef,
969        process_ref: lashlang::ProcessRef,
970        process_name: &str,
971        status: ProcessStatus,
972    ) -> ProcessHandleGrantEntry {
973        let mut record = ProcessRecord::from_registration(ProcessRegistration::new(
974            process_id,
975            ProcessInput::LashlangProcess {
976                module_ref,
977                process_ref,
978                required_surface_ref: surface_ref,
979                process_name: process_name.to_string(),
980                args: serde_json::Map::new(),
981            },
982        ));
983        record.status = status;
984        (
985            ProcessHandleGrant {
986                session_id: "session".to_string(),
987                process_id: process_id.to_string(),
988                descriptor: ProcessHandleDescriptor::new(Some("lashlang"), Some(process_name)),
989            },
990            record,
991        )
992    }
993
994    #[test]
995    fn process_list_filter_matches_definition_and_status() {
996        let module_ref = lashlang::ModuleRef::new(&lashlang::ContentHash::new("module"));
997        let surface_ref = lashlang::RequiredSurfaceRef::new(&lashlang::ContentHash::new("surface"));
998        let target_ref = process_ref("target", 0);
999        let other_ref = process_ref("other", 1);
1000        let filter = ProcessListFilter::decode(&json!({
1001            "definition": process_value(&module_ref, &surface_ref, &target_ref, "target"),
1002            "status": "completed"
1003        }))
1004        .expect("decode filter");
1005
1006        let matching = lashlang_entry(
1007            "matching",
1008            module_ref.clone(),
1009            surface_ref.clone(),
1010            target_ref,
1011            "target",
1012            ProcessStatus::Completed {
1013                await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1014                    json!(true),
1015                )),
1016            },
1017        );
1018        let wrong_definition = lashlang_entry(
1019            "wrong-definition",
1020            module_ref,
1021            surface_ref,
1022            other_ref,
1023            "other",
1024            ProcessStatus::Completed {
1025                await_output: ProcessAwaitOutput::from_tool_output(crate::ToolCallOutput::success(
1026                    json!(true),
1027                )),
1028            },
1029        );
1030
1031        assert_eq!(filter.list_mode(), ProcessListMode::All);
1032        assert!(filter.matches_entry(&matching));
1033        assert!(!filter.matches_entry(&wrong_definition));
1034    }
1035}