Skip to main content

lash_core/runtime/process/
service.rs

1use crate::plugin::PluginError;
2
3use super::events::{ProcessAwaitOutput, ProcessEvent};
4use super::model::{
5    ProcessCancelSummary, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessListMode,
6    ProcessOpScope, ProcessRecord, ProcessRegistration, ProcessStartOptions, ProcessStartRequest,
7};
8
9#[derive(Clone, Copy, Debug, PartialEq, Eq)]
10pub enum ProcessCancelSource {
11    Tool,
12    Lashlang,
13    HostApi,
14}
15
16#[derive(Clone)]
17pub struct ProcessCancelRequest<'scope> {
18    pub session_id: &'scope str,
19    pub process_id: &'scope str,
20    pub handle: Option<serde_json::Value>,
21    pub scope: ProcessOpScope<'scope>,
22    pub reason: Option<String>,
23    pub source: ProcessCancelSource,
24}
25
26impl<'scope> ProcessCancelRequest<'scope> {
27    pub fn new(
28        session_id: &'scope str,
29        process_id: &'scope str,
30        scope: ProcessOpScope<'scope>,
31        source: ProcessCancelSource,
32    ) -> Self {
33        Self {
34            session_id,
35            process_id,
36            handle: None,
37            scope,
38            reason: None,
39            source,
40        }
41    }
42
43    pub fn with_handle(mut self, handle: serde_json::Value) -> Self {
44        self.handle = Some(handle);
45        self
46    }
47
48    pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
49        self.reason = Some(reason.into());
50        self
51    }
52}
53
54#[derive(Clone)]
55pub struct ProcessCancelAllRequest<'scope> {
56    pub session_id: &'scope str,
57    pub scope: ProcessOpScope<'scope>,
58    pub source: ProcessCancelSource,
59    pub reason: Option<String>,
60}
61
62impl<'scope> ProcessCancelAllRequest<'scope> {
63    pub fn new(
64        session_id: &'scope str,
65        scope: ProcessOpScope<'scope>,
66        source: ProcessCancelSource,
67    ) -> Self {
68        Self {
69            session_id,
70            scope,
71            source,
72            reason: None,
73        }
74    }
75
76    pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
77        self.reason = Some(reason.into());
78        self
79    }
80}
81
82#[async_trait::async_trait]
83pub trait ProcessCancelAbility: Send + Sync {
84    async fn cancel(
85        &self,
86        processes: &dyn ProcessService,
87        request: ProcessCancelRequest<'_>,
88    ) -> Result<ProcessRecord, PluginError>;
89
90    async fn cancel_summary(
91        &self,
92        processes: &dyn ProcessService,
93        request: ProcessCancelRequest<'_>,
94    ) -> Result<ProcessCancelSummary, PluginError> {
95        self.cancel(processes, request)
96            .await
97            .map(ProcessCancelSummary::from_record)
98    }
99
100    async fn cancel_all_visible(
101        &self,
102        processes: &dyn ProcessService,
103        request: ProcessCancelAllRequest<'_>,
104    ) -> Result<Vec<ProcessCancelSummary>, PluginError> {
105        let entries = processes
106            .list_visible(
107                request.session_id,
108                ProcessListMode::Live,
109                request.scope.clone(),
110            )
111            .await?;
112        let mut cancelled = Vec::new();
113        for (grant, record) in entries {
114            if record.is_terminal() {
115                continue;
116            }
117            let mut cancel_request = ProcessCancelRequest::new(
118                request.session_id,
119                &grant.process_id,
120                request.scope.clone(),
121                request.source,
122            );
123            if let Some(reason) = request.reason.clone() {
124                cancel_request = cancel_request.with_reason(reason);
125            }
126            cancelled.push(self.cancel_summary(processes, cancel_request).await?);
127        }
128        Ok(cancelled)
129    }
130}
131
132#[derive(Clone, Copy, Debug, Default)]
133pub struct DefaultProcessCancelAbility;
134
135#[async_trait::async_trait]
136impl ProcessCancelAbility for DefaultProcessCancelAbility {
137    async fn cancel(
138        &self,
139        processes: &dyn ProcessService,
140        request: ProcessCancelRequest<'_>,
141    ) -> Result<ProcessRecord, PluginError> {
142        let process_ids = [request.process_id.to_string()];
143        processes
144            .validate_visible(request.session_id, &process_ids, request.scope.clone())
145            .await?;
146        processes
147            .cancel(request.session_id, request.process_id, request.scope)
148            .await
149    }
150}
151
152#[async_trait::async_trait]
153pub trait ProcessService: Send + Sync {
154    async fn start_from_request(
155        &self,
156        session_id: &str,
157        request: ProcessStartRequest,
158        scope: ProcessOpScope<'_>,
159    ) -> Result<ProcessHandleSummary, PluginError> {
160        let _ = (session_id, request, scope);
161        Err(PluginError::Session(
162            "process start request composition is unavailable in this service".to_string(),
163        ))
164    }
165
166    async fn start(
167        &self,
168        session_id: &str,
169        registration: ProcessRegistration,
170        options: ProcessStartOptions,
171        scope: ProcessOpScope<'_>,
172    ) -> Result<ProcessRecord, PluginError>;
173
174    async fn await_process(
175        &self,
176        process_id: &str,
177        scope: ProcessOpScope<'_>,
178    ) -> Result<ProcessAwaitOutput, PluginError>;
179
180    async fn list_visible(
181        &self,
182        session_id: &str,
183        mode: ProcessListMode,
184        scope: ProcessOpScope<'_>,
185    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError>;
186
187    async fn validate_visible(
188        &self,
189        session_id: &str,
190        process_ids: &[String],
191        scope: ProcessOpScope<'_>,
192    ) -> Result<(), PluginError>;
193
194    async fn cancel(
195        &self,
196        session_id: &str,
197        process_id: &str,
198        scope: ProcessOpScope<'_>,
199    ) -> Result<ProcessRecord, PluginError>;
200
201    async fn signal(
202        &self,
203        session_id: &str,
204        process_id: &str,
205        signal_name: String,
206        signal_id: String,
207        payload: serde_json::Value,
208        scope: ProcessOpScope<'_>,
209    ) -> Result<ProcessEvent, PluginError>;
210
211    async fn transfer(
212        &self,
213        from_session_id: &str,
214        to_session_id: &str,
215        process_ids: Vec<String>,
216        scope: ProcessOpScope<'_>,
217    ) -> Result<(), PluginError>;
218
219    async fn cancel_unreferenced(
220        &self,
221        session_id: &str,
222        keep_process_ids: Vec<String>,
223        scope: ProcessOpScope<'_>,
224    ) -> Result<Vec<ProcessRecord>, PluginError>;
225}
226
227pub struct UnavailableProcessService;
228
229#[async_trait::async_trait]
230impl ProcessService for UnavailableProcessService {
231    async fn start(
232        &self,
233        _session_id: &str,
234        _registration: ProcessRegistration,
235        _options: ProcessStartOptions,
236        _scope: ProcessOpScope<'_>,
237    ) -> Result<ProcessRecord, PluginError> {
238        Err(PluginError::Session(
239            "processes are unavailable in this runtime".to_string(),
240        ))
241    }
242
243    async fn await_process(
244        &self,
245        _process_id: &str,
246        _scope: ProcessOpScope<'_>,
247    ) -> Result<ProcessAwaitOutput, PluginError> {
248        Err(PluginError::Session(
249            "process awaiting is unavailable in this runtime".to_string(),
250        ))
251    }
252
253    async fn list_visible(
254        &self,
255        _session_id: &str,
256        _mode: ProcessListMode,
257        _scope: ProcessOpScope<'_>,
258    ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
259        Err(PluginError::Session(
260            "process registry is unavailable in this runtime".to_string(),
261        ))
262    }
263
264    async fn validate_visible(
265        &self,
266        _session_id: &str,
267        _process_ids: &[String],
268        _scope: ProcessOpScope<'_>,
269    ) -> Result<(), PluginError> {
270        Err(PluginError::Session(
271            "process handle validation is unavailable in this runtime".to_string(),
272        ))
273    }
274
275    async fn cancel(
276        &self,
277        _session_id: &str,
278        _process_id: &str,
279        _scope: ProcessOpScope<'_>,
280    ) -> Result<ProcessRecord, PluginError> {
281        Err(PluginError::Session(
282            "process registry is unavailable in this runtime".to_string(),
283        ))
284    }
285
286    async fn signal(
287        &self,
288        _session_id: &str,
289        _process_id: &str,
290        _signal_name: String,
291        _signal_id: String,
292        _payload: serde_json::Value,
293        _scope: ProcessOpScope<'_>,
294    ) -> Result<ProcessEvent, PluginError> {
295        Err(PluginError::Session(
296            "process signalling is unavailable in this runtime".to_string(),
297        ))
298    }
299
300    async fn transfer(
301        &self,
302        _from_session_id: &str,
303        _to_session_id: &str,
304        process_ids: Vec<String>,
305        _scope: ProcessOpScope<'_>,
306    ) -> Result<(), PluginError> {
307        if process_ids.is_empty() {
308            return Ok(());
309        }
310        Err(PluginError::Session(
311            "process handle transfer is unavailable in this runtime".to_string(),
312        ))
313    }
314
315    async fn cancel_unreferenced(
316        &self,
317        _session_id: &str,
318        _keep_process_ids: Vec<String>,
319        _scope: ProcessOpScope<'_>,
320    ) -> Result<Vec<ProcessRecord>, PluginError> {
321        Err(PluginError::Session(
322            "process handle cleanup is unavailable in this runtime".to_string(),
323        ))
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use std::collections::HashSet;
330    use std::sync::{Arc, Mutex};
331
332    use serde_json::json;
333
334    use super::*;
335    use crate::{
336        ProcessAwaitOutput, ProcessEvent, ProcessHandleDescriptor, ProcessHandleGrant,
337        ProcessInput, ProcessProvenance, ProcessRegistration, ProcessStatus,
338    };
339
340    struct RecordingProcessService {
341        visible: HashSet<String>,
342        validate_calls: Mutex<Vec<Vec<String>>>,
343        cancel_calls: Mutex<Vec<String>>,
344        visible_entries: Vec<ProcessHandleGrantEntry>,
345        record: ProcessRecord,
346    }
347
348    impl RecordingProcessService {
349        fn new(visible: impl IntoIterator<Item = String>, record: ProcessRecord) -> Self {
350            Self {
351                visible: visible.into_iter().collect(),
352                validate_calls: Mutex::new(Vec::new()),
353                cancel_calls: Mutex::new(Vec::new()),
354                visible_entries: Vec::new(),
355                record,
356            }
357        }
358
359        fn with_visible_entries(mut self, process_ids: impl IntoIterator<Item = String>) -> Self {
360            self.visible_entries = process_ids
361                .into_iter()
362                .map(|process_id| {
363                    (
364                        ProcessHandleGrant {
365                            session_id: "session-1".to_string(),
366                            process_id: process_id.clone(),
367                            descriptor: ProcessHandleDescriptor::new(
368                                Some("test"),
369                                Some(process_id.clone()),
370                            ),
371                        },
372                        ProcessRecord::from_registration(ProcessRegistration::new(
373                            process_id,
374                            ProcessInput::External {
375                                metadata: json!(null),
376                            },
377                            ProcessProvenance::host("service-test-host"),
378                        )),
379                    )
380                })
381                .collect();
382            self
383        }
384
385        fn validate_calls(&self) -> Vec<Vec<String>> {
386            self.validate_calls.lock().expect("validate calls").clone()
387        }
388
389        fn cancel_calls(&self) -> Vec<String> {
390            self.cancel_calls.lock().expect("cancel calls").clone()
391        }
392    }
393
394    #[derive(Default)]
395    struct RecordingCancelAbility {
396        requests: Mutex<Vec<(String, ProcessCancelSource, Option<String>)>>,
397    }
398
399    impl RecordingCancelAbility {
400        fn requests(&self) -> Vec<(String, ProcessCancelSource, Option<String>)> {
401            self.requests.lock().expect("cancel requests").clone()
402        }
403    }
404
405    #[async_trait::async_trait]
406    impl ProcessCancelAbility for RecordingCancelAbility {
407        async fn cancel(
408            &self,
409            processes: &dyn ProcessService,
410            request: ProcessCancelRequest<'_>,
411        ) -> Result<ProcessRecord, PluginError> {
412            self.requests.lock().expect("cancel requests").push((
413                request.process_id.to_string(),
414                request.source,
415                request.reason.clone(),
416            ));
417            DefaultProcessCancelAbility.cancel(processes, request).await
418        }
419    }
420
421    #[async_trait::async_trait]
422    impl ProcessService for RecordingProcessService {
423        async fn start(
424            &self,
425            _session_id: &str,
426            _registration: ProcessRegistration,
427            _options: ProcessStartOptions,
428            _scope: ProcessOpScope<'_>,
429        ) -> Result<ProcessRecord, PluginError> {
430            Err(PluginError::Session("start not implemented".to_string()))
431        }
432
433        async fn await_process(
434            &self,
435            _process_id: &str,
436            _scope: ProcessOpScope<'_>,
437        ) -> Result<ProcessAwaitOutput, PluginError> {
438            Err(PluginError::Session("await not implemented".to_string()))
439        }
440
441        async fn list_visible(
442            &self,
443            _session_id: &str,
444            _mode: ProcessListMode,
445            _scope: ProcessOpScope<'_>,
446        ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
447            Ok(self.visible_entries.clone())
448        }
449
450        async fn validate_visible(
451            &self,
452            _session_id: &str,
453            process_ids: &[String],
454            _scope: ProcessOpScope<'_>,
455        ) -> Result<(), PluginError> {
456            self.validate_calls
457                .lock()
458                .expect("validate calls")
459                .push(process_ids.to_vec());
460            if let Some(missing) = process_ids
461                .iter()
462                .find(|process_id| !self.visible.contains(*process_id))
463            {
464                return Err(PluginError::Session(format!(
465                    "process handle `{missing}` is not visible"
466                )));
467            }
468            Ok(())
469        }
470
471        async fn cancel(
472            &self,
473            _session_id: &str,
474            process_id: &str,
475            _scope: ProcessOpScope<'_>,
476        ) -> Result<ProcessRecord, PluginError> {
477            self.cancel_calls
478                .lock()
479                .expect("cancel calls")
480                .push(process_id.to_string());
481            let mut record = self.record.clone();
482            record.id = process_id.to_string();
483            Ok(record)
484        }
485
486        async fn signal(
487            &self,
488            _session_id: &str,
489            _process_id: &str,
490            _signal_name: String,
491            _signal_id: String,
492            _payload: serde_json::Value,
493            _scope: ProcessOpScope<'_>,
494        ) -> Result<ProcessEvent, PluginError> {
495            Err(PluginError::Session("signal not implemented".to_string()))
496        }
497
498        async fn transfer(
499            &self,
500            _from_session_id: &str,
501            _to_session_id: &str,
502            _process_ids: Vec<String>,
503            _scope: ProcessOpScope<'_>,
504        ) -> Result<(), PluginError> {
505            Err(PluginError::Session("transfer not implemented".to_string()))
506        }
507
508        async fn cancel_unreferenced(
509            &self,
510            _session_id: &str,
511            _keep_process_ids: Vec<String>,
512            _scope: ProcessOpScope<'_>,
513        ) -> Result<Vec<ProcessRecord>, PluginError> {
514            Err(PluginError::Session(
515                "cancel unreferenced not implemented".to_string(),
516            ))
517        }
518    }
519
520    fn cancelled_record(process_id: &str) -> ProcessRecord {
521        let mut record = ProcessRecord::from_registration(ProcessRegistration::new(
522            process_id,
523            ProcessInput::External {
524                metadata: json!(null),
525            },
526            ProcessProvenance::host("service-test-host"),
527        ));
528        record.status = ProcessStatus::Cancelled {
529            await_output: ProcessAwaitOutput::Cancelled {
530                message: "cancelled".to_string(),
531                raw: None,
532                control: None,
533            },
534        };
535        record
536    }
537
538    fn test_process_scope(id: &str) -> ProcessOpScope<'static> {
539        ProcessOpScope::new(
540            crate::ScopedEffectController::shared(
541                Arc::new(crate::InlineRuntimeEffectController),
542                crate::EffectScope::runtime_operation(id),
543            )
544            .expect("test effect scope"),
545        )
546    }
547
548    #[tokio::test]
549    async fn default_process_cancel_ability_validates_visibility_and_calls_primitive() {
550        let service =
551            RecordingProcessService::new(["process-1".to_string()], cancelled_record("process-1"));
552
553        let record = DefaultProcessCancelAbility
554            .cancel(
555                &service,
556                ProcessCancelRequest::new(
557                    "session-1",
558                    "process-1",
559                    test_process_scope("cancel-visible"),
560                    ProcessCancelSource::HostApi,
561                ),
562            )
563            .await
564            .expect("cancel process");
565
566        assert_eq!(record.status.label(), "cancelled");
567        assert_eq!(
568            service.validate_calls(),
569            vec![vec!["process-1".to_string()]]
570        );
571        assert_eq!(service.cancel_calls(), vec!["process-1".to_string()]);
572    }
573
574    #[tokio::test]
575    async fn default_process_cancel_ability_rejects_invisible_process_without_cancel() {
576        let service = RecordingProcessService::new(Vec::<String>::new(), cancelled_record("p1"));
577
578        let err = DefaultProcessCancelAbility
579            .cancel(
580                &service,
581                ProcessCancelRequest::new(
582                    "session-1",
583                    "p1",
584                    test_process_scope("cancel-hidden"),
585                    ProcessCancelSource::Tool,
586                ),
587            )
588            .await
589            .expect_err("hidden process should be rejected");
590
591        assert!(err.to_string().contains("not visible"), "{err}");
592        assert!(service.cancel_calls().is_empty());
593    }
594
595    #[tokio::test]
596    async fn process_cancel_ability_cancel_all_visible_uses_same_cancel_path() {
597        let service = RecordingProcessService::new(
598            ["process-1".to_string(), "process-2".to_string()],
599            cancelled_record("template"),
600        )
601        .with_visible_entries(["process-1".to_string(), "process-2".to_string()]);
602        let ability = RecordingCancelAbility::default();
603
604        let summaries = ability
605            .cancel_all_visible(
606                &service,
607                ProcessCancelAllRequest::new(
608                    "session-1",
609                    test_process_scope("cancel-all"),
610                    ProcessCancelSource::Tool,
611                )
612                .with_reason("requested by tool"),
613            )
614            .await
615            .expect("cancel all visible");
616
617        assert_eq!(
618            summaries
619                .iter()
620                .map(|summary| summary.process_id.as_str())
621                .collect::<Vec<_>>(),
622            vec!["process-1", "process-2"]
623        );
624        assert_eq!(
625            ability.requests(),
626            vec![
627                (
628                    "process-1".to_string(),
629                    ProcessCancelSource::Tool,
630                    Some("requested by tool".to_string())
631                ),
632                (
633                    "process-2".to_string(),
634                    ProcessCancelSource::Tool,
635                    Some("requested by tool".to_string())
636                )
637            ]
638        );
639        assert_eq!(
640            service.validate_calls(),
641            vec![vec!["process-1".to_string()], vec!["process-2".to_string()]]
642        );
643        assert_eq!(
644            service.cancel_calls(),
645            vec!["process-1".to_string(), "process-2".to_string()]
646        );
647    }
648}