1use crate::plugin::PluginError;
2
3use super::events::{ProcessAwaitOutput, ProcessEvent};
4use super::model::{
5 ProcessCancelSummary, ProcessHandleGrantEntry, ProcessHandleSummary, ProcessListMode,
6 ProcessOpScope, ProcessRecord, ProcessRegistration, ProcessStartOptions, ProcessStartRequest,
7};
8
9#[derive(Clone, Copy, Debug, PartialEq, Eq)]
10pub enum ProcessCancelSource {
11 Tool,
12 Lashlang,
13 HostApi,
14}
15
16#[derive(Clone)]
17pub struct ProcessCancelRequest<'scope> {
18 pub session_id: &'scope str,
19 pub process_id: &'scope str,
20 pub handle: Option<serde_json::Value>,
21 pub scope: ProcessOpScope<'scope>,
22 pub reason: Option<String>,
23 pub source: ProcessCancelSource,
24}
25
26impl<'scope> ProcessCancelRequest<'scope> {
27 pub fn new(
28 session_id: &'scope str,
29 process_id: &'scope str,
30 scope: ProcessOpScope<'scope>,
31 source: ProcessCancelSource,
32 ) -> Self {
33 Self {
34 session_id,
35 process_id,
36 handle: None,
37 scope,
38 reason: None,
39 source,
40 }
41 }
42
43 pub fn with_handle(mut self, handle: serde_json::Value) -> Self {
44 self.handle = Some(handle);
45 self
46 }
47
48 pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
49 self.reason = Some(reason.into());
50 self
51 }
52}
53
54#[derive(Clone)]
55pub struct ProcessCancelAllRequest<'scope> {
56 pub session_id: &'scope str,
57 pub scope: ProcessOpScope<'scope>,
58 pub source: ProcessCancelSource,
59 pub reason: Option<String>,
60}
61
62impl<'scope> ProcessCancelAllRequest<'scope> {
63 pub fn new(
64 session_id: &'scope str,
65 scope: ProcessOpScope<'scope>,
66 source: ProcessCancelSource,
67 ) -> Self {
68 Self {
69 session_id,
70 scope,
71 source,
72 reason: None,
73 }
74 }
75
76 pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
77 self.reason = Some(reason.into());
78 self
79 }
80}
81
82#[async_trait::async_trait]
83pub trait ProcessCancelAbility: Send + Sync {
84 async fn cancel(
85 &self,
86 processes: &dyn ProcessService,
87 request: ProcessCancelRequest<'_>,
88 ) -> Result<ProcessRecord, PluginError>;
89
90 async fn cancel_summary(
91 &self,
92 processes: &dyn ProcessService,
93 request: ProcessCancelRequest<'_>,
94 ) -> Result<ProcessCancelSummary, PluginError> {
95 self.cancel(processes, request)
96 .await
97 .map(ProcessCancelSummary::from_record)
98 }
99
100 async fn cancel_all_visible(
101 &self,
102 processes: &dyn ProcessService,
103 request: ProcessCancelAllRequest<'_>,
104 ) -> Result<Vec<ProcessCancelSummary>, PluginError> {
105 let entries = processes
106 .list_visible(
107 request.session_id,
108 ProcessListMode::Live,
109 request.scope.clone(),
110 )
111 .await?;
112 let mut cancelled = Vec::new();
113 for (grant, record) in entries {
114 if record.is_terminal() {
115 continue;
116 }
117 let mut cancel_request = ProcessCancelRequest::new(
118 request.session_id,
119 &grant.process_id,
120 request.scope.clone(),
121 request.source,
122 );
123 if let Some(reason) = request.reason.clone() {
124 cancel_request = cancel_request.with_reason(reason);
125 }
126 cancelled.push(self.cancel_summary(processes, cancel_request).await?);
127 }
128 Ok(cancelled)
129 }
130}
131
132#[derive(Clone, Copy, Debug, Default)]
133pub struct DefaultProcessCancelAbility;
134
135#[async_trait::async_trait]
136impl ProcessCancelAbility for DefaultProcessCancelAbility {
137 async fn cancel(
138 &self,
139 processes: &dyn ProcessService,
140 request: ProcessCancelRequest<'_>,
141 ) -> Result<ProcessRecord, PluginError> {
142 let process_ids = [request.process_id.to_string()];
143 processes
144 .validate_visible(request.session_id, &process_ids, request.scope.clone())
145 .await?;
146 processes
147 .cancel(request.session_id, request.process_id, request.scope)
148 .await
149 }
150}
151
152#[async_trait::async_trait]
153pub trait ProcessService: Send + Sync {
154 async fn start_from_request(
155 &self,
156 session_id: &str,
157 request: ProcessStartRequest,
158 scope: ProcessOpScope<'_>,
159 ) -> Result<ProcessHandleSummary, PluginError> {
160 let _ = (session_id, request, scope);
161 Err(PluginError::Session(
162 "process start request composition is unavailable in this service".to_string(),
163 ))
164 }
165
166 async fn start(
167 &self,
168 session_id: &str,
169 registration: ProcessRegistration,
170 options: ProcessStartOptions,
171 scope: ProcessOpScope<'_>,
172 ) -> Result<ProcessRecord, PluginError>;
173
174 async fn await_process(
175 &self,
176 process_id: &str,
177 scope: ProcessOpScope<'_>,
178 ) -> Result<ProcessAwaitOutput, PluginError>;
179
180 async fn list_visible(
181 &self,
182 session_id: &str,
183 mode: ProcessListMode,
184 scope: ProcessOpScope<'_>,
185 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError>;
186
187 async fn validate_visible(
188 &self,
189 session_id: &str,
190 process_ids: &[String],
191 scope: ProcessOpScope<'_>,
192 ) -> Result<(), PluginError>;
193
194 async fn cancel(
195 &self,
196 session_id: &str,
197 process_id: &str,
198 scope: ProcessOpScope<'_>,
199 ) -> Result<ProcessRecord, PluginError>;
200
201 async fn signal(
202 &self,
203 session_id: &str,
204 process_id: &str,
205 signal_name: String,
206 signal_id: String,
207 payload: serde_json::Value,
208 scope: ProcessOpScope<'_>,
209 ) -> Result<ProcessEvent, PluginError>;
210
211 async fn transfer(
212 &self,
213 from_session_id: &str,
214 to_session_id: &str,
215 process_ids: Vec<String>,
216 scope: ProcessOpScope<'_>,
217 ) -> Result<(), PluginError>;
218
219 async fn cancel_unreferenced(
220 &self,
221 session_id: &str,
222 keep_process_ids: Vec<String>,
223 scope: ProcessOpScope<'_>,
224 ) -> Result<Vec<ProcessRecord>, PluginError>;
225}
226
227pub struct UnavailableProcessService;
228
229#[async_trait::async_trait]
230impl ProcessService for UnavailableProcessService {
231 async fn start(
232 &self,
233 _session_id: &str,
234 _registration: ProcessRegistration,
235 _options: ProcessStartOptions,
236 _scope: ProcessOpScope<'_>,
237 ) -> Result<ProcessRecord, PluginError> {
238 Err(PluginError::Session(
239 "processes are unavailable in this runtime".to_string(),
240 ))
241 }
242
243 async fn await_process(
244 &self,
245 _process_id: &str,
246 _scope: ProcessOpScope<'_>,
247 ) -> Result<ProcessAwaitOutput, PluginError> {
248 Err(PluginError::Session(
249 "process awaiting is unavailable in this runtime".to_string(),
250 ))
251 }
252
253 async fn list_visible(
254 &self,
255 _session_id: &str,
256 _mode: ProcessListMode,
257 _scope: ProcessOpScope<'_>,
258 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
259 Err(PluginError::Session(
260 "process registry is unavailable in this runtime".to_string(),
261 ))
262 }
263
264 async fn validate_visible(
265 &self,
266 _session_id: &str,
267 _process_ids: &[String],
268 _scope: ProcessOpScope<'_>,
269 ) -> Result<(), PluginError> {
270 Err(PluginError::Session(
271 "process handle validation is unavailable in this runtime".to_string(),
272 ))
273 }
274
275 async fn cancel(
276 &self,
277 _session_id: &str,
278 _process_id: &str,
279 _scope: ProcessOpScope<'_>,
280 ) -> Result<ProcessRecord, PluginError> {
281 Err(PluginError::Session(
282 "process registry is unavailable in this runtime".to_string(),
283 ))
284 }
285
286 async fn signal(
287 &self,
288 _session_id: &str,
289 _process_id: &str,
290 _signal_name: String,
291 _signal_id: String,
292 _payload: serde_json::Value,
293 _scope: ProcessOpScope<'_>,
294 ) -> Result<ProcessEvent, PluginError> {
295 Err(PluginError::Session(
296 "process signalling is unavailable in this runtime".to_string(),
297 ))
298 }
299
300 async fn transfer(
301 &self,
302 _from_session_id: &str,
303 _to_session_id: &str,
304 process_ids: Vec<String>,
305 _scope: ProcessOpScope<'_>,
306 ) -> Result<(), PluginError> {
307 if process_ids.is_empty() {
308 return Ok(());
309 }
310 Err(PluginError::Session(
311 "process handle transfer is unavailable in this runtime".to_string(),
312 ))
313 }
314
315 async fn cancel_unreferenced(
316 &self,
317 _session_id: &str,
318 _keep_process_ids: Vec<String>,
319 _scope: ProcessOpScope<'_>,
320 ) -> Result<Vec<ProcessRecord>, PluginError> {
321 Err(PluginError::Session(
322 "process handle cleanup is unavailable in this runtime".to_string(),
323 ))
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use std::collections::HashSet;
330 use std::sync::{Arc, Mutex};
331
332 use serde_json::json;
333
334 use super::*;
335 use crate::{
336 ProcessAwaitOutput, ProcessEvent, ProcessHandleDescriptor, ProcessHandleGrant,
337 ProcessInput, ProcessProvenance, ProcessRegistration, ProcessStatus,
338 };
339
340 struct RecordingProcessService {
341 visible: HashSet<String>,
342 validate_calls: Mutex<Vec<Vec<String>>>,
343 cancel_calls: Mutex<Vec<String>>,
344 visible_entries: Vec<ProcessHandleGrantEntry>,
345 record: ProcessRecord,
346 }
347
348 impl RecordingProcessService {
349 fn new(visible: impl IntoIterator<Item = String>, record: ProcessRecord) -> Self {
350 Self {
351 visible: visible.into_iter().collect(),
352 validate_calls: Mutex::new(Vec::new()),
353 cancel_calls: Mutex::new(Vec::new()),
354 visible_entries: Vec::new(),
355 record,
356 }
357 }
358
359 fn with_visible_entries(mut self, process_ids: impl IntoIterator<Item = String>) -> Self {
360 self.visible_entries = process_ids
361 .into_iter()
362 .map(|process_id| {
363 (
364 ProcessHandleGrant {
365 session_id: "session-1".to_string(),
366 process_id: process_id.clone(),
367 descriptor: ProcessHandleDescriptor::new(
368 Some("test"),
369 Some(process_id.clone()),
370 ),
371 },
372 ProcessRecord::from_registration(ProcessRegistration::new(
373 process_id,
374 ProcessInput::External {
375 metadata: json!(null),
376 },
377 ProcessProvenance::host("service-test-host"),
378 )),
379 )
380 })
381 .collect();
382 self
383 }
384
385 fn validate_calls(&self) -> Vec<Vec<String>> {
386 self.validate_calls.lock().expect("validate calls").clone()
387 }
388
389 fn cancel_calls(&self) -> Vec<String> {
390 self.cancel_calls.lock().expect("cancel calls").clone()
391 }
392 }
393
394 #[derive(Default)]
395 struct RecordingCancelAbility {
396 requests: Mutex<Vec<(String, ProcessCancelSource, Option<String>)>>,
397 }
398
399 impl RecordingCancelAbility {
400 fn requests(&self) -> Vec<(String, ProcessCancelSource, Option<String>)> {
401 self.requests.lock().expect("cancel requests").clone()
402 }
403 }
404
405 #[async_trait::async_trait]
406 impl ProcessCancelAbility for RecordingCancelAbility {
407 async fn cancel(
408 &self,
409 processes: &dyn ProcessService,
410 request: ProcessCancelRequest<'_>,
411 ) -> Result<ProcessRecord, PluginError> {
412 self.requests.lock().expect("cancel requests").push((
413 request.process_id.to_string(),
414 request.source,
415 request.reason.clone(),
416 ));
417 DefaultProcessCancelAbility.cancel(processes, request).await
418 }
419 }
420
421 #[async_trait::async_trait]
422 impl ProcessService for RecordingProcessService {
423 async fn start(
424 &self,
425 _session_id: &str,
426 _registration: ProcessRegistration,
427 _options: ProcessStartOptions,
428 _scope: ProcessOpScope<'_>,
429 ) -> Result<ProcessRecord, PluginError> {
430 Err(PluginError::Session("start not implemented".to_string()))
431 }
432
433 async fn await_process(
434 &self,
435 _process_id: &str,
436 _scope: ProcessOpScope<'_>,
437 ) -> Result<ProcessAwaitOutput, PluginError> {
438 Err(PluginError::Session("await not implemented".to_string()))
439 }
440
441 async fn list_visible(
442 &self,
443 _session_id: &str,
444 _mode: ProcessListMode,
445 _scope: ProcessOpScope<'_>,
446 ) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
447 Ok(self.visible_entries.clone())
448 }
449
450 async fn validate_visible(
451 &self,
452 _session_id: &str,
453 process_ids: &[String],
454 _scope: ProcessOpScope<'_>,
455 ) -> Result<(), PluginError> {
456 self.validate_calls
457 .lock()
458 .expect("validate calls")
459 .push(process_ids.to_vec());
460 if let Some(missing) = process_ids
461 .iter()
462 .find(|process_id| !self.visible.contains(*process_id))
463 {
464 return Err(PluginError::Session(format!(
465 "process handle `{missing}` is not visible"
466 )));
467 }
468 Ok(())
469 }
470
471 async fn cancel(
472 &self,
473 _session_id: &str,
474 process_id: &str,
475 _scope: ProcessOpScope<'_>,
476 ) -> Result<ProcessRecord, PluginError> {
477 self.cancel_calls
478 .lock()
479 .expect("cancel calls")
480 .push(process_id.to_string());
481 let mut record = self.record.clone();
482 record.id = process_id.to_string();
483 Ok(record)
484 }
485
486 async fn signal(
487 &self,
488 _session_id: &str,
489 _process_id: &str,
490 _signal_name: String,
491 _signal_id: String,
492 _payload: serde_json::Value,
493 _scope: ProcessOpScope<'_>,
494 ) -> Result<ProcessEvent, PluginError> {
495 Err(PluginError::Session("signal not implemented".to_string()))
496 }
497
498 async fn transfer(
499 &self,
500 _from_session_id: &str,
501 _to_session_id: &str,
502 _process_ids: Vec<String>,
503 _scope: ProcessOpScope<'_>,
504 ) -> Result<(), PluginError> {
505 Err(PluginError::Session("transfer not implemented".to_string()))
506 }
507
508 async fn cancel_unreferenced(
509 &self,
510 _session_id: &str,
511 _keep_process_ids: Vec<String>,
512 _scope: ProcessOpScope<'_>,
513 ) -> Result<Vec<ProcessRecord>, PluginError> {
514 Err(PluginError::Session(
515 "cancel unreferenced not implemented".to_string(),
516 ))
517 }
518 }
519
520 fn cancelled_record(process_id: &str) -> ProcessRecord {
521 let mut record = ProcessRecord::from_registration(ProcessRegistration::new(
522 process_id,
523 ProcessInput::External {
524 metadata: json!(null),
525 },
526 ProcessProvenance::host("service-test-host"),
527 ));
528 record.status = ProcessStatus::Cancelled {
529 await_output: ProcessAwaitOutput::Cancelled {
530 message: "cancelled".to_string(),
531 raw: None,
532 control: None,
533 },
534 };
535 record
536 }
537
538 fn test_process_scope(id: &str) -> ProcessOpScope<'static> {
539 ProcessOpScope::new(
540 crate::ScopedEffectController::shared(
541 Arc::new(crate::InlineRuntimeEffectController),
542 crate::EffectScope::runtime_operation(id),
543 )
544 .expect("test effect scope"),
545 )
546 }
547
548 #[tokio::test]
549 async fn default_process_cancel_ability_validates_visibility_and_calls_primitive() {
550 let service =
551 RecordingProcessService::new(["process-1".to_string()], cancelled_record("process-1"));
552
553 let record = DefaultProcessCancelAbility
554 .cancel(
555 &service,
556 ProcessCancelRequest::new(
557 "session-1",
558 "process-1",
559 test_process_scope("cancel-visible"),
560 ProcessCancelSource::HostApi,
561 ),
562 )
563 .await
564 .expect("cancel process");
565
566 assert_eq!(record.status.label(), "cancelled");
567 assert_eq!(
568 service.validate_calls(),
569 vec![vec!["process-1".to_string()]]
570 );
571 assert_eq!(service.cancel_calls(), vec!["process-1".to_string()]);
572 }
573
574 #[tokio::test]
575 async fn default_process_cancel_ability_rejects_invisible_process_without_cancel() {
576 let service = RecordingProcessService::new(Vec::<String>::new(), cancelled_record("p1"));
577
578 let err = DefaultProcessCancelAbility
579 .cancel(
580 &service,
581 ProcessCancelRequest::new(
582 "session-1",
583 "p1",
584 test_process_scope("cancel-hidden"),
585 ProcessCancelSource::Tool,
586 ),
587 )
588 .await
589 .expect_err("hidden process should be rejected");
590
591 assert!(err.to_string().contains("not visible"), "{err}");
592 assert!(service.cancel_calls().is_empty());
593 }
594
595 #[tokio::test]
596 async fn process_cancel_ability_cancel_all_visible_uses_same_cancel_path() {
597 let service = RecordingProcessService::new(
598 ["process-1".to_string(), "process-2".to_string()],
599 cancelled_record("template"),
600 )
601 .with_visible_entries(["process-1".to_string(), "process-2".to_string()]);
602 let ability = RecordingCancelAbility::default();
603
604 let summaries = ability
605 .cancel_all_visible(
606 &service,
607 ProcessCancelAllRequest::new(
608 "session-1",
609 test_process_scope("cancel-all"),
610 ProcessCancelSource::Tool,
611 )
612 .with_reason("requested by tool"),
613 )
614 .await
615 .expect("cancel all visible");
616
617 assert_eq!(
618 summaries
619 .iter()
620 .map(|summary| summary.process_id.as_str())
621 .collect::<Vec<_>>(),
622 vec!["process-1", "process-2"]
623 );
624 assert_eq!(
625 ability.requests(),
626 vec![
627 (
628 "process-1".to_string(),
629 ProcessCancelSource::Tool,
630 Some("requested by tool".to_string())
631 ),
632 (
633 "process-2".to_string(),
634 ProcessCancelSource::Tool,
635 Some("requested by tool".to_string())
636 )
637 ]
638 );
639 assert_eq!(
640 service.validate_calls(),
641 vec![vec!["process-1".to_string()], vec!["process-2".to_string()]]
642 );
643 assert_eq!(
644 service.cancel_calls(),
645 vec!["process-1".to_string(), "process-2".to_string()]
646 );
647 }
648}