1use std::collections::BTreeMap;
2use std::sync::{Arc, Mutex};
3
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use tokio::io::{AsyncBufReadExt, BufReader};
7use tokio::process::Command;
8use tokio::select;
9
10use crate::ToolResult;
11use crate::monitor::{
12 MonitorArmOn, MonitorEvent, MonitorRunState, MonitorSnapshot, MonitorSpec, MonitorStatus,
13 MonitorUpdateBatch, MonitorWakePolicy,
14};
15use crate::plugin::{
16 PluginAction, PluginActionContext, PluginActionFailure, PluginActionKind, PluginError,
17 PluginFactory, PluginRegistrar, PluginSessionContext, PluginSnapshotMeta, SessionParam,
18 SessionPlugin, SnapshotReader, SnapshotWriter,
19};
20
21pub const MONITOR_PLUGIN_ID: &str = "monitor";
22
23#[derive(Default)]
24pub struct MonitorPluginFactory;
25
26impl PluginFactory for MonitorPluginFactory {
27 fn id(&self) -> &'static str {
28 MONITOR_PLUGIN_ID
29 }
30
31 fn build(&self, _ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
32 Ok(Arc::new(MonitorPlugin::default()))
33 }
34}
35
36#[derive(Default)]
37struct MonitorPlugin {
38 state: Arc<Mutex<MonitorPluginState>>,
39}
40
41fn tool_result_output<T>(result: ToolResult) -> Result<T, PluginActionFailure>
42where
43 T: serde::de::DeserializeOwned,
44{
45 if !result.is_success() {
46 return Err(PluginActionFailure::new(
47 result.value_for_projection().to_string(),
48 ));
49 }
50 serde_json::from_value(result.into_value_for_projection())
51 .map_err(|err| PluginActionFailure::new(format!("invalid monitor output: {err}")))
52}
53
54fn tool_result_unit(result: ToolResult) -> Result<(), PluginActionFailure> {
55 if result.is_success() {
56 Ok(())
57 } else {
58 Err(PluginActionFailure::new(
59 result.value_for_projection().to_string(),
60 ))
61 }
62}
63
64#[derive(Clone, Debug, Default, Serialize, Deserialize)]
65struct MonitorSnapshotState {
66 #[serde(default)]
67 revision: u64,
68 #[serde(default)]
69 sequence: u64,
70 #[serde(default)]
71 monitors: BTreeMap<String, MonitorEntry>,
72}
73
74#[derive(Clone, Debug, Default, Serialize, Deserialize)]
75struct MonitorEntry {
76 #[serde(default, skip_serializing_if = "Option::is_none")]
77 owner_plugin_id: Option<String>,
78 status: MonitorStatus,
79 #[serde(default)]
80 pending_wake: bool,
81 #[serde(skip, default)]
82 runtime_pid: Option<u32>,
83}
84
85#[derive(Default)]
86struct MonitorPluginState {
87 snapshot: MonitorSnapshotState,
88 updates: Vec<MonitorEvent>,
89}
90
91#[derive(Clone, Debug, Serialize, Deserialize, crate::JsonSchema)]
92pub struct OwnedMonitorSpec {
93 #[serde(default, skip_serializing_if = "Option::is_none")]
94 pub plugin_id: Option<String>,
95 pub spec: MonitorSpec,
96}
97
98#[derive(Clone, Debug, Default, Serialize, Deserialize, crate::JsonSchema)]
99pub struct RegisterSpecsArgs {
100 #[serde(default)]
101 pub specs: Vec<OwnedMonitorSpec>,
102}
103
104#[derive(Clone, Debug, Serialize, Deserialize, crate::JsonSchema)]
105pub struct StartMonitorArgs {
106 pub spec: MonitorSpec,
107}
108
109#[derive(Clone, Debug, Serialize, Deserialize, crate::JsonSchema)]
110pub struct StopMonitorArgs {
111 pub id: String,
112}
113
114#[derive(Clone, Debug, Default, Serialize, Deserialize, crate::JsonSchema)]
115pub struct MonitorEmptyArgs {}
116
117#[derive(Clone, Debug, Default, Serialize, Deserialize, crate::JsonSchema)]
118pub struct AckWakeArgs {
119 #[serde(default)]
120 pub ids: Vec<String>,
121}
122
123pub struct MonitorRegisterSpecsOp;
124pub struct MonitorStatusOp;
125pub struct MonitorTakeUpdatesOp;
126pub struct MonitorAckWakeOp;
127pub struct MonitorStartOp;
128pub struct MonitorStopOp;
129
130impl PluginAction for MonitorRegisterSpecsOp {
131 const NAME: &'static str = "monitor.register_specs";
132 const DESCRIPTION: &'static str = "Register typed monitor specs for the current session.";
133 const KIND: PluginActionKind = PluginActionKind::Command;
134 const SESSION_PARAM: SessionParam = SessionParam::Required;
135 type Args = RegisterSpecsArgs;
136 type Output = ();
137}
138
139impl PluginAction for MonitorStatusOp {
140 const NAME: &'static str = "monitor.status";
141 const DESCRIPTION: &'static str = "Return current monitor status.";
142 const KIND: PluginActionKind = PluginActionKind::Query;
143 const SESSION_PARAM: SessionParam = SessionParam::Required;
144 type Args = MonitorEmptyArgs;
145 type Output = MonitorSnapshot;
146}
147
148impl PluginAction for MonitorTakeUpdatesOp {
149 const NAME: &'static str = "monitor.take_updates";
150 const DESCRIPTION: &'static str = "Drain pending monitor updates.";
151 const KIND: PluginActionKind = PluginActionKind::Task;
152 const SESSION_PARAM: SessionParam = SessionParam::Required;
153 type Args = MonitorEmptyArgs;
154 type Output = MonitorUpdateBatch;
155}
156
157impl PluginAction for MonitorAckWakeOp {
158 const NAME: &'static str = "monitor.ack_wake";
159 const DESCRIPTION: &'static str = "Acknowledge pending monitor wake-ups.";
160 const KIND: PluginActionKind = PluginActionKind::Command;
161 const SESSION_PARAM: SessionParam = SessionParam::Required;
162 type Args = AckWakeArgs;
163 type Output = ();
164}
165
166impl PluginAction for MonitorStartOp {
167 const NAME: &'static str = "monitor.start";
168 const DESCRIPTION: &'static str = "Start a monitor.";
169 const KIND: PluginActionKind = PluginActionKind::Command;
170 const SESSION_PARAM: SessionParam = SessionParam::Required;
171 type Args = StartMonitorArgs;
172 type Output = MonitorSnapshot;
173}
174
175impl PluginAction for MonitorStopOp {
176 const NAME: &'static str = "monitor.stop";
177 const DESCRIPTION: &'static str = "Stop a monitor.";
178 const KIND: PluginActionKind = PluginActionKind::Command;
179 const SESSION_PARAM: SessionParam = SessionParam::Required;
180 type Args = StopMonitorArgs;
181 type Output = MonitorSnapshot;
182}
183
184impl MonitorPlugin {
185 fn lock_state(&self) -> Result<std::sync::MutexGuard<'_, MonitorPluginState>, PluginError> {
186 self.state
187 .lock()
188 .map_err(|_| PluginError::Session("monitor state poisoned".to_string()))
189 }
190
191 fn bump_revision(state: &mut MonitorPluginState) {
192 state.snapshot.revision = state.snapshot.revision.saturating_add(1);
193 }
194
195 fn visible_id(owner_plugin_id: Option<&str>, spec_id: &str) -> String {
196 match owner_plugin_id {
197 Some(plugin_id) if !plugin_id.is_empty() && plugin_id != MONITOR_PLUGIN_ID => {
198 format!("{plugin_id}:{spec_id}")
199 }
200 _ => spec_id.to_string(),
201 }
202 }
203
204 fn snapshot_from_state(state: &MonitorPluginState) -> MonitorSnapshot {
205 let mut monitors = state
206 .snapshot
207 .monitors
208 .values()
209 .map(|entry| entry.status.clone())
210 .collect::<Vec<_>>();
211 monitors.sort_by(|left, right| left.spec.id.cmp(&right.spec.id));
212 MonitorSnapshot {
213 revision: state.snapshot.revision,
214 active_count: monitors
215 .iter()
216 .filter(|status| status.state == MonitorRunState::Running)
217 .count(),
218 monitors,
219 }
220 }
221
222 fn queue_update(
223 state: &mut MonitorPluginState,
224 monitor_id: &str,
225 message: String,
226 queue_turn_input: Option<String>,
227 ) {
228 state.snapshot.sequence = state.snapshot.sequence.saturating_add(1);
229 state.updates.push(MonitorEvent {
230 sequence: state.snapshot.sequence,
231 monitor_id: monitor_id.to_string(),
232 message,
233 queue_turn_input,
234 });
235 if state.updates.len() > 128 {
236 let drop_count = state.updates.len() - 128;
237 state.updates.drain(0..drop_count);
238 }
239 Self::bump_revision(state);
240 }
241
242 fn upsert_spec(
243 state: &mut MonitorPluginState,
244 owner_plugin_id: Option<String>,
245 mut spec: MonitorSpec,
246 armed: Option<bool>,
247 ) -> Result<String, PluginError> {
248 spec.id = Self::visible_id(owner_plugin_id.as_deref(), spec.id.trim());
249 if spec.id.trim().is_empty() {
250 return Err(PluginError::Session(
251 "monitor id must be a non-empty string".to_string(),
252 ));
253 }
254 if spec.command.trim().is_empty() {
255 return Err(PluginError::Session(
256 "monitor command must be a non-empty string".to_string(),
257 ));
258 }
259 let default_armed = matches!(spec.arm_on, MonitorArmOn::SessionStart);
260 let spec_id = spec.id.clone();
261 match state.snapshot.monitors.get_mut(&spec_id) {
262 Some(entry) => {
263 let was_armed = entry.status.armed;
264 entry.owner_plugin_id = owner_plugin_id;
265 entry.status.spec = spec.clone();
266 entry.status.armed = armed.unwrap_or(was_armed || default_armed);
267 }
268 None => {
269 state.snapshot.monitors.insert(
270 spec_id.clone(),
271 MonitorEntry {
272 owner_plugin_id,
273 status: MonitorStatus {
274 spec,
275 armed: armed.unwrap_or(default_armed),
276 state: MonitorRunState::Idle,
277 last_event: None,
278 last_error: None,
279 last_exit_status: None,
280 event_count: 0,
281 },
282 pending_wake: false,
283 runtime_pid: None,
284 },
285 );
286 }
287 }
288 Self::bump_revision(state);
289 Ok(spec_id)
290 }
291
292 async fn ensure_running(
293 &self,
294 session_id: &str,
295 host: Arc<dyn crate::plugin::runtime_host::TaskHost>,
296 ) -> Result<(), PluginError> {
297 let to_start = {
298 let state = self.lock_state()?;
299 state
300 .snapshot
301 .monitors
302 .values()
303 .filter(|entry| {
304 entry.status.armed && entry.status.state != MonitorRunState::Running
305 })
306 .map(|entry| entry.status.spec.clone())
307 .collect::<Vec<_>>()
308 };
309 for spec in to_start {
310 self.start_task(session_id, host.clone(), spec).await?;
311 }
312 Ok(())
313 }
314
315 async fn start_task(
316 &self,
317 session_id: &str,
318 host: Arc<dyn crate::plugin::runtime_host::TaskHost>,
319 spec: MonitorSpec,
320 ) -> Result<(), PluginError> {
321 let task_id = format!("monitor:{}", spec.id);
322 let state = Arc::clone(&self.state);
323 let session_id_owned = session_id.to_string();
324 let spec_clone = spec.clone();
325 let task_host = host.clone();
326 let managed_spec = crate::BackgroundTaskRegistration {
327 id: task_id.clone(),
328 kind: crate::BackgroundTaskKind::Monitor,
329 producer: "monitor",
330 child_session_id: None,
331 parent_task_id: None,
332 };
333 match host
334 .spawn_managed_task(
335 session_id,
336 managed_spec,
337 Box::pin(async move {
338 run_monitor_task(state, session_id_owned, spec_clone, task_host).await
339 }),
340 )
341 .await
342 {
343 Ok(()) => {
344 let mut state = self.lock_state()?;
345 if let Some(entry) = state.snapshot.monitors.get_mut(&spec.id) {
346 entry.status.armed = true;
347 entry.status.state = MonitorRunState::Running;
348 entry.status.last_error = None;
349 entry.status.last_exit_status = None;
350 }
351 Self::bump_revision(&mut state);
352 Ok(())
353 }
354 Err(err) if err.to_string().contains("already running") => {
355 let mut state = self.lock_state()?;
356 if let Some(entry) = state.snapshot.monitors.get_mut(&spec.id) {
357 entry.status.state = MonitorRunState::Running;
358 }
359 Self::bump_revision(&mut state);
360 Ok(())
361 }
362 Err(err) => {
363 let mut state = self.lock_state()?;
364 if let Some(entry) = state.snapshot.monitors.get_mut(&spec.id) {
365 entry.status.state = MonitorRunState::Failed;
366 entry.status.last_error = Some(err.to_string());
367 entry.status.armed = false;
368 }
369 Self::queue_update(
370 &mut state,
371 &spec.id,
372 format!("Failed to start monitor: {err}"),
373 None,
374 );
375 Err(err)
376 }
377 }
378 }
379
380 async fn handle_register_specs(
381 &self,
382 ctx: PluginActionContext,
383 args: serde_json::Value,
384 ) -> ToolResult {
385 let _ = ctx;
386 let parsed = match serde_json::from_value::<RegisterSpecsArgs>(args) {
387 Ok(parsed) => parsed,
388 Err(err) => return ToolResult::err_fmt(format_args!("invalid monitor specs: {err}")),
389 };
390 let mut state = match self.lock_state() {
391 Ok(state) => state,
392 Err(err) => return ToolResult::err_fmt(err.to_string()),
393 };
394 for owned in parsed.specs {
395 if let Err(err) = Self::upsert_spec(&mut state, owned.plugin_id, owned.spec, None) {
396 return ToolResult::err_fmt(err.to_string());
397 }
398 }
399 ToolResult::ok(serde_json::json!(Self::snapshot_from_state(&state)))
400 }
401
402 async fn handle_status(&self, ctx: PluginActionContext) -> ToolResult {
403 let Some(session_id) = ctx.session_id.as_deref() else {
404 return ToolResult::err_fmt("monitor.status requires a session");
405 };
406 if let Err(err) = self.ensure_running(session_id, ctx.host.clone()).await {
407 return ToolResult::err_fmt(err.to_string());
408 }
409 let state = match self.lock_state() {
410 Ok(state) => state,
411 Err(err) => return ToolResult::err_fmt(err.to_string()),
412 };
413 ToolResult::ok(serde_json::json!(Self::snapshot_from_state(&state)))
414 }
415
416 async fn handle_take_updates(&self, ctx: PluginActionContext) -> ToolResult {
417 let Some(session_id) = ctx.session_id.as_deref() else {
418 return ToolResult::err_fmt("monitor.take_updates requires a session");
419 };
420 if let Err(err) = self.ensure_running(session_id, ctx.host.clone()).await {
421 return ToolResult::err_fmt(err.to_string());
422 }
423 let mut state = match self.lock_state() {
424 Ok(state) => state,
425 Err(err) => return ToolResult::err_fmt(err.to_string()),
426 };
427 let active_count = state
428 .snapshot
429 .monitors
430 .values()
431 .filter(|entry| entry.status.state == MonitorRunState::Running)
432 .count();
433 ToolResult::ok(serde_json::json!(MonitorUpdateBatch {
434 revision: state.snapshot.revision,
435 active_count,
436 events: std::mem::take(&mut state.updates),
437 }))
438 }
439
440 async fn handle_ack_wake(&self, args: serde_json::Value) -> ToolResult {
441 let parsed = match serde_json::from_value::<AckWakeArgs>(args) {
442 Ok(parsed) => parsed,
443 Err(err) => return ToolResult::err_fmt(format_args!("invalid ack payload: {err}")),
444 };
445 let mut state = match self.lock_state() {
446 Ok(state) => state,
447 Err(err) => return ToolResult::err_fmt(err.to_string()),
448 };
449 for id in parsed.ids {
450 if let Some(entry) = state.snapshot.monitors.get_mut(&id) {
451 entry.pending_wake = false;
452 }
453 }
454 Self::bump_revision(&mut state);
455 ToolResult::ok(serde_json::json!(Self::snapshot_from_state(&state)))
456 }
457
458 async fn handle_start(&self, ctx: PluginActionContext, args: serde_json::Value) -> ToolResult {
459 let Some(session_id) = ctx.session_id.as_deref() else {
460 return ToolResult::err_fmt("monitor.start requires a session");
461 };
462 let parsed = match serde_json::from_value::<StartMonitorArgs>(args) {
463 Ok(parsed) => parsed,
464 Err(err) => return ToolResult::err_fmt(format_args!("invalid monitor spec: {err}")),
465 };
466 let entry_spec = {
467 let mut state = match self.lock_state() {
468 Ok(state) => state,
469 Err(err) => return ToolResult::err_fmt(err.to_string()),
470 };
471 let spec_id = match Self::upsert_spec(&mut state, None, parsed.spec.clone(), Some(true))
472 {
473 Ok(spec_id) => spec_id,
474 Err(err) => return ToolResult::err_fmt(err.to_string()),
475 };
476 let Some(entry_spec) = state
477 .snapshot
478 .monitors
479 .get(&spec_id)
480 .map(|entry| entry.status.spec.clone())
481 else {
482 return ToolResult::err_fmt("monitor registration failed");
483 };
484 entry_spec
485 };
486 if let Err(err) = self
487 .start_task(session_id, ctx.host.clone(), entry_spec)
488 .await
489 {
490 return ToolResult::err_fmt(err.to_string());
491 }
492 let state = match self.lock_state() {
493 Ok(state) => state,
494 Err(err) => return ToolResult::err_fmt(err.to_string()),
495 };
496 ToolResult::ok(serde_json::json!(Self::snapshot_from_state(&state)))
497 }
498
499 async fn handle_stop(&self, ctx: PluginActionContext, args: serde_json::Value) -> ToolResult {
500 let Some(session_id) = ctx.session_id.as_deref() else {
501 return ToolResult::err_fmt("monitor.stop requires a session");
502 };
503 let parsed = match serde_json::from_value::<StopMonitorArgs>(args) {
504 Ok(parsed) => parsed,
505 Err(err) => return ToolResult::err_fmt(format_args!("invalid stop payload: {err}")),
506 };
507 let (monitor_id, runtime_pid) = {
508 let state = match self.lock_state() {
509 Ok(state) => state,
510 Err(err) => return ToolResult::err_fmt(err.to_string()),
511 };
512 let Some(entry) = state.snapshot.monitors.get(&parsed.id) else {
513 return ToolResult::err_fmt(format_args!("unknown monitor `{}`", parsed.id));
514 };
515 (entry.status.spec.id.clone(), entry.runtime_pid)
516 };
517
518 if let Err(err) = terminate_monitor_process_tree(runtime_pid).await {
519 return ToolResult::err_fmt(err.to_string());
520 }
521
522 if let Err(err) = ctx
523 .host
524 .cancel_managed_task(session_id, &format!("monitor:{}", parsed.id))
525 .await
526 {
527 return ToolResult::err_fmt(err.to_string());
528 }
529
530 let mut state = match self.lock_state() {
531 Ok(state) => state,
532 Err(err) => return ToolResult::err_fmt(err.to_string()),
533 };
534 let Some(entry) = state.snapshot.monitors.get_mut(&parsed.id) else {
535 return ToolResult::err_fmt(format_args!("unknown monitor `{}`", parsed.id));
536 };
537 entry.status.armed = false;
538 entry.status.state = MonitorRunState::Stopped;
539 entry.pending_wake = false;
540 entry.runtime_pid = None;
541 entry.status.last_error = None;
542 entry.status.last_exit_status = None;
543 MonitorPlugin::queue_update(&mut state, &monitor_id, "Monitor stopped".to_string(), None);
544 ToolResult::ok(serde_json::json!(Self::snapshot_from_state(&state)))
545 }
546}
547
548#[async_trait]
549impl SessionPlugin for MonitorPlugin {
550 fn id(&self) -> &'static str {
551 MONITOR_PLUGIN_ID
552 }
553
554 fn register(&self, reg: &mut PluginRegistrar) -> Result<(), PluginError> {
555 let state = Arc::clone(&self.state);
556 reg.actions()
557 .typed::<MonitorRegisterSpecsOp, _, _>(move |ctx, args| {
558 let plugin = MonitorPlugin {
559 state: Arc::clone(&state),
560 };
561 async move {
562 tool_result_unit(
563 plugin
564 .handle_register_specs(
565 ctx,
566 serde_json::to_value(args).unwrap_or_default(),
567 )
568 .await,
569 )
570 }
571 })?;
572
573 let state = Arc::clone(&self.state);
574 reg.actions()
575 .typed::<MonitorStatusOp, _, _>(move |ctx, _args| {
576 let plugin = MonitorPlugin {
577 state: Arc::clone(&state),
578 };
579 async move { tool_result_output(plugin.handle_status(ctx).await) }
580 })?;
581
582 let state = Arc::clone(&self.state);
583 reg.actions()
584 .typed::<MonitorTakeUpdatesOp, _, _>(move |ctx, _args| {
585 let plugin = MonitorPlugin {
586 state: Arc::clone(&state),
587 };
588 async move { tool_result_output(plugin.handle_take_updates(ctx).await) }
589 })?;
590
591 let state = Arc::clone(&self.state);
592 reg.actions()
593 .typed::<MonitorAckWakeOp, _, _>(move |_ctx, args| {
594 let plugin = MonitorPlugin {
595 state: Arc::clone(&state),
596 };
597 async move {
598 tool_result_unit(
599 plugin
600 .handle_ack_wake(serde_json::to_value(args).unwrap_or_default())
601 .await,
602 )
603 }
604 })?;
605
606 let state = Arc::clone(&self.state);
607 reg.actions()
608 .typed::<MonitorStartOp, _, _>(move |ctx, args| {
609 let plugin = MonitorPlugin {
610 state: Arc::clone(&state),
611 };
612 async move {
613 tool_result_output(
614 plugin
615 .handle_start(ctx, serde_json::to_value(args).unwrap_or_default())
616 .await,
617 )
618 }
619 })?;
620
621 let state = Arc::clone(&self.state);
622 reg.actions()
623 .typed::<MonitorStopOp, _, _>(move |ctx, args| {
624 let plugin = MonitorPlugin {
625 state: Arc::clone(&state),
626 };
627 async move {
628 tool_result_output(
629 plugin
630 .handle_stop(ctx, serde_json::to_value(args).unwrap_or_default())
631 .await,
632 )
633 }
634 })?;
635 Ok(())
636 }
637
638 fn snapshot(
639 &self,
640 _writer: &mut dyn SnapshotWriter,
641 ) -> Result<PluginSnapshotMeta, PluginError> {
642 let snapshot = self.lock_state()?.snapshot.clone();
643 Ok(PluginSnapshotMeta {
644 plugin_id: self.id().to_string(),
645 plugin_version: self.version().to_string(),
646 revision: snapshot.revision,
647 state: Some(serde_json::to_value(snapshot).map_err(|err| {
648 PluginError::Snapshot(format!("failed to serialize monitor snapshot: {err}"))
649 })?),
650 })
651 }
652
653 fn restore(
654 &self,
655 meta: &PluginSnapshotMeta,
656 _reader: &dyn SnapshotReader,
657 ) -> Result<(), PluginError> {
658 let snapshot = meta
659 .state
660 .clone()
661 .map(serde_json::from_value::<MonitorSnapshotState>)
662 .transpose()
663 .map_err(|err| PluginError::Snapshot(err.to_string()))?
664 .unwrap_or_default();
665 let mut state = self.lock_state()?;
666 state.snapshot = snapshot;
667 state.updates.clear();
668 for entry in state.snapshot.monitors.values_mut() {
669 entry.pending_wake = false;
670 entry.runtime_pid = None;
671 if entry.status.armed && entry.status.spec.restart_on_restore {
672 entry.status.state = MonitorRunState::Idle;
673 } else if entry.status.state == MonitorRunState::Running {
674 entry.status.state = MonitorRunState::Idle;
675 entry.status.armed = false;
676 }
677 }
678 Ok(())
679 }
680
681 fn snapshot_revision(&self) -> u64 {
682 self.state
683 .lock()
684 .map(|state| state.snapshot.revision)
685 .unwrap_or_default()
686 }
687}
688
689async fn run_monitor_task(
690 state: Arc<Mutex<MonitorPluginState>>,
691 _session_id: String,
692 spec: MonitorSpec,
693 _host: Arc<dyn crate::plugin::runtime_host::TaskHost>,
694) -> Result<(), PluginError> {
695 let timeout_deadline = (!spec.persistent)
696 .then(|| tokio::time::Instant::now() + std::time::Duration::from_millis(spec.timeout_ms));
697 let mut command = Command::new("bash");
698 command.arg("-lc").arg(&spec.command);
699 if let Some(cwd) = spec.cwd.as_ref() {
700 command.current_dir(cwd);
701 }
702 if !spec.env.is_empty() {
703 command.envs(spec.env.iter());
704 }
705 command.kill_on_drop(true);
706 command.stdout(std::process::Stdio::piped());
707 command.stderr(std::process::Stdio::piped());
708 configure_monitor_command(&mut command);
709
710 let mut child = command
711 .spawn()
712 .map_err(|err| PluginError::Session(format!("failed to start monitor process: {err}")))?;
713 let runtime_pid = child.id();
714 {
715 let mut guard = state
716 .lock()
717 .map_err(|_| PluginError::Session("monitor state poisoned".to_string()))?;
718 if let Some(entry) = guard.snapshot.monitors.get_mut(&spec.id) {
719 entry.runtime_pid = runtime_pid;
720 }
721 }
722 let stdout = child
723 .stdout
724 .take()
725 .ok_or_else(|| PluginError::Session("monitor stdout unavailable".to_string()))?;
726 let stderr = child
727 .stderr
728 .take()
729 .ok_or_else(|| PluginError::Session("monitor stderr unavailable".to_string()))?;
730 let mut stdout_lines = BufReader::new(stdout).lines();
731 let mut stderr_lines = BufReader::new(stderr).lines();
732 let mut stdout_done = false;
733 let mut stderr_done = false;
734 let mut timeout = timeout_deadline.map(|deadline| Box::pin(tokio::time::sleep_until(deadline)));
735 let mut timed_out = false;
736
737 let id = spec.id.clone();
738 let wake_policy = spec.wake_policy;
739
740 while !stdout_done || !stderr_done {
741 select! {
742 _ = timeout.as_mut().unwrap(), if timeout.is_some() => {
743 timed_out = true;
744 break;
745 }
746 line = stdout_lines.next_line(), if !stdout_done => {
747 match line.map_err(|err| PluginError::Session(format!("monitor stdout read failed: {err}")))? {
748 Some(line) => record_monitor_line(&state, &spec, &id, wake_policy, line, true)?,
749 None => stdout_done = true,
750 }
751 }
752 line = stderr_lines.next_line(), if !stderr_done => {
753 match line.map_err(|err| PluginError::Session(format!("monitor stderr read failed: {err}")))? {
754 Some(line) => record_monitor_line(
755 &state,
756 &spec,
757 &id,
758 MonitorWakePolicy::Notify,
759 line,
760 false,
761 )?,
762 None => stderr_done = true,
763 }
764 }
765 }
766 }
767
768 let exit =
769 if timed_out {
770 terminate_monitor_process_tree(runtime_pid).await?;
771 child
772 .wait()
773 .await
774 .map_err(|err| PluginError::Session(format!("monitor wait failed: {err}")))?
775 } else if let Some(deadline) = timeout_deadline {
776 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
777 match tokio::time::timeout(remaining, child.wait()).await {
778 Ok(result) => result
779 .map_err(|err| PluginError::Session(format!("monitor wait failed: {err}")))?,
780 Err(_) => {
781 timed_out = true;
782 terminate_monitor_process_tree(runtime_pid).await?;
783 child.wait().await.map_err(|err| {
784 PluginError::Session(format!("monitor wait failed: {err}"))
785 })?
786 }
787 }
788 } else {
789 child
790 .wait()
791 .await
792 .map_err(|err| PluginError::Session(format!("monitor wait failed: {err}")))?
793 };
794
795 let mut state = state
796 .lock()
797 .map_err(|_| PluginError::Session("monitor state poisoned".to_string()))?;
798 if let Some(entry) = state.snapshot.monitors.get_mut(&id) {
799 let was_stopped = entry.status.state == MonitorRunState::Stopped;
800 entry.status.state = if was_stopped {
801 MonitorRunState::Stopped
802 } else if timed_out {
803 MonitorRunState::Failed
804 } else if exit.success() {
805 MonitorRunState::Exited
806 } else {
807 MonitorRunState::Failed
808 };
809 entry.status.last_exit_status = exit.code();
810 entry.status.armed = false;
811 entry.runtime_pid = None;
812 if timed_out {
813 entry.status.last_error =
814 Some(format!("monitor timed out after {}ms", spec.timeout_ms));
815 } else if !exit.success() && !was_stopped {
816 entry.status.last_error = Some(format!(
817 "monitor exited with status {}",
818 exit.code().unwrap_or_default()
819 ));
820 }
821 entry.pending_wake = false;
822 }
823 if state
824 .snapshot
825 .monitors
826 .get(&id)
827 .map(|entry| entry.status.state != MonitorRunState::Stopped)
828 .unwrap_or(false)
829 {
830 MonitorPlugin::queue_update(
831 &mut state,
832 &id,
833 if timed_out {
834 format!("Monitor timed out after {}ms", spec.timeout_ms)
835 } else if exit.success() {
836 "Monitor exited".to_string()
837 } else {
838 format!(
839 "Monitor failed with status {}",
840 exit.code().unwrap_or_default()
841 )
842 },
843 None,
844 );
845 }
846 Ok(())
847}
848
849#[cfg(unix)]
850fn configure_monitor_command(command: &mut Command) {
851 unsafe {
854 command.pre_exec(|| {
855 if libc::setsid() == -1 {
856 return Err(std::io::Error::last_os_error());
857 }
858 Ok(())
859 });
860 }
861}
862
863#[cfg(not(unix))]
864fn configure_monitor_command(_command: &mut Command) {}
865
866#[cfg(unix)]
867async fn terminate_monitor_process_tree(runtime_pid: Option<u32>) -> Result<(), PluginError> {
868 let Some(pid) = runtime_pid else {
869 return Ok(());
870 };
871 let pgid = -(pid as i32);
872 send_process_group_signal(pgid, libc::SIGTERM)?;
873 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
874 if process_group_exists(pgid) {
875 send_process_group_signal(pgid, libc::SIGKILL)?;
876 }
877 Ok(())
878}
879
880#[cfg(not(unix))]
881async fn terminate_monitor_process_tree(_runtime_pid: Option<u32>) -> Result<(), PluginError> {
882 Ok(())
883}
884
885#[cfg(unix)]
886fn process_group_exists(pgid: i32) -> bool {
887 let rc = unsafe { libc::kill(pgid, 0) };
889 if rc == 0 {
890 return true;
891 }
892 let err = std::io::Error::last_os_error();
893 !matches!(err.raw_os_error(), Some(libc::ESRCH))
894}
895
896#[cfg(unix)]
897fn send_process_group_signal(pgid: i32, signal: libc::c_int) -> Result<(), PluginError> {
898 let rc = unsafe { libc::kill(pgid, signal) };
899 if rc == 0 {
900 return Ok(());
901 }
902 let err = std::io::Error::last_os_error();
903 if matches!(err.raw_os_error(), Some(libc::ESRCH)) {
904 return Ok(());
905 }
906 Err(PluginError::Session(format!(
907 "failed to signal monitor process group {pgid}: {err}"
908 )))
909}
910
911fn record_monitor_line(
912 state: &Arc<Mutex<MonitorPluginState>>,
913 spec: &MonitorSpec,
914 id: &str,
915 wake_policy: MonitorWakePolicy,
916 line: String,
917 from_stdout: bool,
918) -> Result<(), PluginError> {
919 let message = line.trim().to_string();
920 if message.is_empty() {
921 return Ok(());
922 }
923 let mut state = state
924 .lock()
925 .map_err(|_| PluginError::Session("monitor state poisoned".to_string()))?;
926 let Some(entry) = state.snapshot.monitors.get_mut(id) else {
927 return Ok(());
928 };
929 entry.status.last_event = Some(message.clone());
930 entry.status.event_count = entry.status.event_count.saturating_add(1);
931 let queue_turn_input =
932 if from_stdout && wake_policy == MonitorWakePolicy::QueueTurn && !entry.pending_wake {
933 entry.pending_wake = true;
934 Some(format!("Monitor event \"{}\": {}", spec.id, message))
935 } else {
936 None
937 };
938 MonitorPlugin::queue_update(&mut state, id, message, queue_turn_input);
939 Ok(())
940}
941
942#[cfg(test)]
943mod tests {
944 use super::*;
945 use crate::testing::MockSessionManager;
946
947 fn seeded_monitor_state(spec: &MonitorSpec) -> Arc<Mutex<MonitorPluginState>> {
948 let mut monitors = BTreeMap::new();
949 monitors.insert(
950 spec.id.clone(),
951 MonitorEntry {
952 owner_plugin_id: None,
953 status: MonitorStatus {
954 spec: spec.clone(),
955 armed: true,
956 state: MonitorRunState::Running,
957 last_event: None,
958 last_error: None,
959 last_exit_status: None,
960 event_count: 0,
961 },
962 pending_wake: false,
963 runtime_pid: None,
964 },
965 );
966 Arc::new(Mutex::new(MonitorPluginState {
967 snapshot: MonitorSnapshotState {
968 revision: 0,
969 sequence: 0,
970 monitors,
971 },
972 updates: Vec::new(),
973 }))
974 }
975
976 #[tokio::test]
977 async fn non_persistent_monitor_times_out_and_records_failure() {
978 let spec = MonitorSpec {
979 id: "slow".to_string(),
980 command: "sleep 5".to_string(),
981 persistent: false,
982 timeout_ms: 50,
983 ..Default::default()
984 };
985 let state = seeded_monitor_state(&spec);
986 let host: Arc<dyn crate::plugin::runtime_host::RuntimeSessionHost> =
987 Arc::new(MockSessionManager::default());
988
989 run_monitor_task(state.clone(), "root".to_string(), spec, host)
990 .await
991 .expect("monitor task should complete after timeout");
992
993 let guard = state.lock().expect("monitor state");
994 let entry = guard
995 .snapshot
996 .monitors
997 .get("slow")
998 .expect("seeded monitor entry");
999 assert_eq!(entry.status.state, MonitorRunState::Failed);
1000 assert!(!entry.status.armed);
1001 assert!(
1002 entry
1003 .status
1004 .last_error
1005 .as_deref()
1006 .is_some_and(|error| error.contains("timed out after 50ms"))
1007 );
1008 assert!(
1009 guard
1010 .updates
1011 .iter()
1012 .any(|event| event.message.contains("timed out after 50ms"))
1013 );
1014 }
1015}