Skip to main content

lash_core/plugin/
monitor.rs

1use std::collections::BTreeMap;
2use std::sync::{Arc, Mutex};
3
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use tokio::io::{AsyncBufReadExt, BufReader};
7use tokio::process::Command;
8use tokio::select;
9
10use crate::ToolResult;
11use crate::monitor::{
12    MonitorArmOn, MonitorEvent, MonitorRunState, MonitorSnapshot, MonitorSpec, MonitorStatus,
13    MonitorUpdateBatch, MonitorWakePolicy,
14};
15use crate::plugin::{
16    PluginAction, PluginActionContext, PluginActionFailure, PluginActionKind, PluginError,
17    PluginFactory, PluginRegistrar, PluginSessionContext, PluginSnapshotMeta, SessionParam,
18    SessionPlugin, SnapshotReader, SnapshotWriter,
19};
20
21pub const MONITOR_PLUGIN_ID: &str = "monitor";
22
23#[derive(Default)]
24pub struct MonitorPluginFactory;
25
26impl PluginFactory for MonitorPluginFactory {
27    fn id(&self) -> &'static str {
28        MONITOR_PLUGIN_ID
29    }
30
31    fn build(&self, _ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
32        Ok(Arc::new(MonitorPlugin::default()))
33    }
34}
35
36#[derive(Default)]
37struct MonitorPlugin {
38    state: Arc<Mutex<MonitorPluginState>>,
39}
40
41fn tool_result_output<T>(result: ToolResult) -> Result<T, PluginActionFailure>
42where
43    T: serde::de::DeserializeOwned,
44{
45    if !result.is_success() {
46        return Err(PluginActionFailure::new(
47            result.value_for_projection().to_string(),
48        ));
49    }
50    serde_json::from_value(result.into_value_for_projection())
51        .map_err(|err| PluginActionFailure::new(format!("invalid monitor output: {err}")))
52}
53
54fn tool_result_unit(result: ToolResult) -> Result<(), PluginActionFailure> {
55    if result.is_success() {
56        Ok(())
57    } else {
58        Err(PluginActionFailure::new(
59            result.value_for_projection().to_string(),
60        ))
61    }
62}
63
64#[derive(Clone, Debug, Default, Serialize, Deserialize)]
65struct MonitorSnapshotState {
66    #[serde(default)]
67    revision: u64,
68    #[serde(default)]
69    sequence: u64,
70    #[serde(default)]
71    monitors: BTreeMap<String, MonitorEntry>,
72}
73
74#[derive(Clone, Debug, Default, Serialize, Deserialize)]
75struct MonitorEntry {
76    #[serde(default, skip_serializing_if = "Option::is_none")]
77    owner_plugin_id: Option<String>,
78    status: MonitorStatus,
79    #[serde(default)]
80    pending_wake: bool,
81    #[serde(skip, default)]
82    runtime_pid: Option<u32>,
83}
84
85#[derive(Default)]
86struct MonitorPluginState {
87    snapshot: MonitorSnapshotState,
88    updates: Vec<MonitorEvent>,
89}
90
91#[derive(Clone, Debug, Serialize, Deserialize, crate::JsonSchema)]
92pub struct OwnedMonitorSpec {
93    #[serde(default, skip_serializing_if = "Option::is_none")]
94    pub plugin_id: Option<String>,
95    pub spec: MonitorSpec,
96}
97
98#[derive(Clone, Debug, Default, Serialize, Deserialize, crate::JsonSchema)]
99pub struct RegisterSpecsArgs {
100    #[serde(default)]
101    pub specs: Vec<OwnedMonitorSpec>,
102}
103
104#[derive(Clone, Debug, Serialize, Deserialize, crate::JsonSchema)]
105pub struct StartMonitorArgs {
106    pub spec: MonitorSpec,
107}
108
109#[derive(Clone, Debug, Serialize, Deserialize, crate::JsonSchema)]
110pub struct StopMonitorArgs {
111    pub id: String,
112}
113
114#[derive(Clone, Debug, Default, Serialize, Deserialize, crate::JsonSchema)]
115pub struct MonitorEmptyArgs {}
116
117#[derive(Clone, Debug, Default, Serialize, Deserialize, crate::JsonSchema)]
118pub struct AckWakeArgs {
119    #[serde(default)]
120    pub ids: Vec<String>,
121}
122
123pub struct MonitorRegisterSpecsOp;
124pub struct MonitorStatusOp;
125pub struct MonitorTakeUpdatesOp;
126pub struct MonitorAckWakeOp;
127pub struct MonitorStartOp;
128pub struct MonitorStopOp;
129
130impl PluginAction for MonitorRegisterSpecsOp {
131    const NAME: &'static str = "monitor.register_specs";
132    const DESCRIPTION: &'static str = "Register typed monitor specs for the current session.";
133    const KIND: PluginActionKind = PluginActionKind::Command;
134    const SESSION_PARAM: SessionParam = SessionParam::Required;
135    type Args = RegisterSpecsArgs;
136    type Output = ();
137}
138
139impl PluginAction for MonitorStatusOp {
140    const NAME: &'static str = "monitor.status";
141    const DESCRIPTION: &'static str = "Return current monitor status.";
142    const KIND: PluginActionKind = PluginActionKind::Query;
143    const SESSION_PARAM: SessionParam = SessionParam::Required;
144    type Args = MonitorEmptyArgs;
145    type Output = MonitorSnapshot;
146}
147
148impl PluginAction for MonitorTakeUpdatesOp {
149    const NAME: &'static str = "monitor.take_updates";
150    const DESCRIPTION: &'static str = "Drain pending monitor updates.";
151    const KIND: PluginActionKind = PluginActionKind::Task;
152    const SESSION_PARAM: SessionParam = SessionParam::Required;
153    type Args = MonitorEmptyArgs;
154    type Output = MonitorUpdateBatch;
155}
156
157impl PluginAction for MonitorAckWakeOp {
158    const NAME: &'static str = "monitor.ack_wake";
159    const DESCRIPTION: &'static str = "Acknowledge pending monitor wake-ups.";
160    const KIND: PluginActionKind = PluginActionKind::Command;
161    const SESSION_PARAM: SessionParam = SessionParam::Required;
162    type Args = AckWakeArgs;
163    type Output = ();
164}
165
166impl PluginAction for MonitorStartOp {
167    const NAME: &'static str = "monitor.start";
168    const DESCRIPTION: &'static str = "Start a monitor.";
169    const KIND: PluginActionKind = PluginActionKind::Command;
170    const SESSION_PARAM: SessionParam = SessionParam::Required;
171    type Args = StartMonitorArgs;
172    type Output = MonitorSnapshot;
173}
174
175impl PluginAction for MonitorStopOp {
176    const NAME: &'static str = "monitor.stop";
177    const DESCRIPTION: &'static str = "Stop a monitor.";
178    const KIND: PluginActionKind = PluginActionKind::Command;
179    const SESSION_PARAM: SessionParam = SessionParam::Required;
180    type Args = StopMonitorArgs;
181    type Output = MonitorSnapshot;
182}
183
184impl MonitorPlugin {
185    fn lock_state(&self) -> Result<std::sync::MutexGuard<'_, MonitorPluginState>, PluginError> {
186        self.state
187            .lock()
188            .map_err(|_| PluginError::Session("monitor state poisoned".to_string()))
189    }
190
191    fn bump_revision(state: &mut MonitorPluginState) {
192        state.snapshot.revision = state.snapshot.revision.saturating_add(1);
193    }
194
195    fn visible_id(owner_plugin_id: Option<&str>, spec_id: &str) -> String {
196        match owner_plugin_id {
197            Some(plugin_id) if !plugin_id.is_empty() && plugin_id != MONITOR_PLUGIN_ID => {
198                format!("{plugin_id}:{spec_id}")
199            }
200            _ => spec_id.to_string(),
201        }
202    }
203
204    fn snapshot_from_state(state: &MonitorPluginState) -> MonitorSnapshot {
205        let mut monitors = state
206            .snapshot
207            .monitors
208            .values()
209            .map(|entry| entry.status.clone())
210            .collect::<Vec<_>>();
211        monitors.sort_by(|left, right| left.spec.id.cmp(&right.spec.id));
212        MonitorSnapshot {
213            revision: state.snapshot.revision,
214            active_count: monitors
215                .iter()
216                .filter(|status| status.state == MonitorRunState::Running)
217                .count(),
218            monitors,
219        }
220    }
221
222    fn queue_update(
223        state: &mut MonitorPluginState,
224        monitor_id: &str,
225        message: String,
226        queue_turn_input: Option<String>,
227    ) {
228        state.snapshot.sequence = state.snapshot.sequence.saturating_add(1);
229        state.updates.push(MonitorEvent {
230            sequence: state.snapshot.sequence,
231            monitor_id: monitor_id.to_string(),
232            message,
233            queue_turn_input,
234        });
235        if state.updates.len() > 128 {
236            let drop_count = state.updates.len() - 128;
237            state.updates.drain(0..drop_count);
238        }
239        Self::bump_revision(state);
240    }
241
242    fn upsert_spec(
243        state: &mut MonitorPluginState,
244        owner_plugin_id: Option<String>,
245        mut spec: MonitorSpec,
246        armed: Option<bool>,
247    ) -> Result<String, PluginError> {
248        spec.id = Self::visible_id(owner_plugin_id.as_deref(), spec.id.trim());
249        if spec.id.trim().is_empty() {
250            return Err(PluginError::Session(
251                "monitor id must be a non-empty string".to_string(),
252            ));
253        }
254        if spec.command.trim().is_empty() {
255            return Err(PluginError::Session(
256                "monitor command must be a non-empty string".to_string(),
257            ));
258        }
259        let default_armed = matches!(spec.arm_on, MonitorArmOn::SessionStart);
260        let spec_id = spec.id.clone();
261        match state.snapshot.monitors.get_mut(&spec_id) {
262            Some(entry) => {
263                let was_armed = entry.status.armed;
264                entry.owner_plugin_id = owner_plugin_id;
265                entry.status.spec = spec.clone();
266                entry.status.armed = armed.unwrap_or(was_armed || default_armed);
267            }
268            None => {
269                state.snapshot.monitors.insert(
270                    spec_id.clone(),
271                    MonitorEntry {
272                        owner_plugin_id,
273                        status: MonitorStatus {
274                            spec,
275                            armed: armed.unwrap_or(default_armed),
276                            state: MonitorRunState::Idle,
277                            last_event: None,
278                            last_error: None,
279                            last_exit_status: None,
280                            event_count: 0,
281                        },
282                        pending_wake: false,
283                        runtime_pid: None,
284                    },
285                );
286            }
287        }
288        Self::bump_revision(state);
289        Ok(spec_id)
290    }
291
292    async fn ensure_running(
293        &self,
294        session_id: &str,
295        host: Arc<dyn crate::plugin::runtime_host::TaskHost>,
296    ) -> Result<(), PluginError> {
297        let to_start = {
298            let state = self.lock_state()?;
299            state
300                .snapshot
301                .monitors
302                .values()
303                .filter(|entry| {
304                    entry.status.armed && entry.status.state != MonitorRunState::Running
305                })
306                .map(|entry| entry.status.spec.clone())
307                .collect::<Vec<_>>()
308        };
309        for spec in to_start {
310            self.start_task(session_id, host.clone(), spec).await?;
311        }
312        Ok(())
313    }
314
315    async fn start_task(
316        &self,
317        session_id: &str,
318        host: Arc<dyn crate::plugin::runtime_host::TaskHost>,
319        spec: MonitorSpec,
320    ) -> Result<(), PluginError> {
321        let task_id = format!("monitor:{}", spec.id);
322        let state = Arc::clone(&self.state);
323        let session_id_owned = session_id.to_string();
324        let spec_clone = spec.clone();
325        let task_host = host.clone();
326        let managed_spec = crate::BackgroundTaskRegistration {
327            id: task_id.clone(),
328            kind: crate::BackgroundTaskKind::Monitor,
329            producer: "monitor",
330            child_session_id: None,
331            parent_task_id: None,
332        };
333        match host
334            .spawn_managed_task(
335                session_id,
336                managed_spec,
337                Box::pin(async move {
338                    run_monitor_task(state, session_id_owned, spec_clone, task_host).await
339                }),
340            )
341            .await
342        {
343            Ok(()) => {
344                let mut state = self.lock_state()?;
345                if let Some(entry) = state.snapshot.monitors.get_mut(&spec.id) {
346                    entry.status.armed = true;
347                    entry.status.state = MonitorRunState::Running;
348                    entry.status.last_error = None;
349                    entry.status.last_exit_status = None;
350                }
351                Self::bump_revision(&mut state);
352                Ok(())
353            }
354            Err(err) if err.to_string().contains("already running") => {
355                let mut state = self.lock_state()?;
356                if let Some(entry) = state.snapshot.monitors.get_mut(&spec.id) {
357                    entry.status.state = MonitorRunState::Running;
358                }
359                Self::bump_revision(&mut state);
360                Ok(())
361            }
362            Err(err) => {
363                let mut state = self.lock_state()?;
364                if let Some(entry) = state.snapshot.monitors.get_mut(&spec.id) {
365                    entry.status.state = MonitorRunState::Failed;
366                    entry.status.last_error = Some(err.to_string());
367                    entry.status.armed = false;
368                }
369                Self::queue_update(
370                    &mut state,
371                    &spec.id,
372                    format!("Failed to start monitor: {err}"),
373                    None,
374                );
375                Err(err)
376            }
377        }
378    }
379
380    async fn handle_register_specs(
381        &self,
382        ctx: PluginActionContext,
383        args: serde_json::Value,
384    ) -> ToolResult {
385        let _ = ctx;
386        let parsed = match serde_json::from_value::<RegisterSpecsArgs>(args) {
387            Ok(parsed) => parsed,
388            Err(err) => return ToolResult::err_fmt(format_args!("invalid monitor specs: {err}")),
389        };
390        let mut state = match self.lock_state() {
391            Ok(state) => state,
392            Err(err) => return ToolResult::err_fmt(err.to_string()),
393        };
394        for owned in parsed.specs {
395            if let Err(err) = Self::upsert_spec(&mut state, owned.plugin_id, owned.spec, None) {
396                return ToolResult::err_fmt(err.to_string());
397            }
398        }
399        ToolResult::ok(serde_json::json!(Self::snapshot_from_state(&state)))
400    }
401
402    async fn handle_status(&self, ctx: PluginActionContext) -> ToolResult {
403        let Some(session_id) = ctx.session_id.as_deref() else {
404            return ToolResult::err_fmt("monitor.status requires a session");
405        };
406        if let Err(err) = self.ensure_running(session_id, ctx.host.clone()).await {
407            return ToolResult::err_fmt(err.to_string());
408        }
409        let state = match self.lock_state() {
410            Ok(state) => state,
411            Err(err) => return ToolResult::err_fmt(err.to_string()),
412        };
413        ToolResult::ok(serde_json::json!(Self::snapshot_from_state(&state)))
414    }
415
416    async fn handle_take_updates(&self, ctx: PluginActionContext) -> ToolResult {
417        let Some(session_id) = ctx.session_id.as_deref() else {
418            return ToolResult::err_fmt("monitor.take_updates requires a session");
419        };
420        if let Err(err) = self.ensure_running(session_id, ctx.host.clone()).await {
421            return ToolResult::err_fmt(err.to_string());
422        }
423        let mut state = match self.lock_state() {
424            Ok(state) => state,
425            Err(err) => return ToolResult::err_fmt(err.to_string()),
426        };
427        let active_count = state
428            .snapshot
429            .monitors
430            .values()
431            .filter(|entry| entry.status.state == MonitorRunState::Running)
432            .count();
433        ToolResult::ok(serde_json::json!(MonitorUpdateBatch {
434            revision: state.snapshot.revision,
435            active_count,
436            events: std::mem::take(&mut state.updates),
437        }))
438    }
439
440    async fn handle_ack_wake(&self, args: serde_json::Value) -> ToolResult {
441        let parsed = match serde_json::from_value::<AckWakeArgs>(args) {
442            Ok(parsed) => parsed,
443            Err(err) => return ToolResult::err_fmt(format_args!("invalid ack payload: {err}")),
444        };
445        let mut state = match self.lock_state() {
446            Ok(state) => state,
447            Err(err) => return ToolResult::err_fmt(err.to_string()),
448        };
449        for id in parsed.ids {
450            if let Some(entry) = state.snapshot.monitors.get_mut(&id) {
451                entry.pending_wake = false;
452            }
453        }
454        Self::bump_revision(&mut state);
455        ToolResult::ok(serde_json::json!(Self::snapshot_from_state(&state)))
456    }
457
458    async fn handle_start(&self, ctx: PluginActionContext, args: serde_json::Value) -> ToolResult {
459        let Some(session_id) = ctx.session_id.as_deref() else {
460            return ToolResult::err_fmt("monitor.start requires a session");
461        };
462        let parsed = match serde_json::from_value::<StartMonitorArgs>(args) {
463            Ok(parsed) => parsed,
464            Err(err) => return ToolResult::err_fmt(format_args!("invalid monitor spec: {err}")),
465        };
466        let entry_spec = {
467            let mut state = match self.lock_state() {
468                Ok(state) => state,
469                Err(err) => return ToolResult::err_fmt(err.to_string()),
470            };
471            let spec_id = match Self::upsert_spec(&mut state, None, parsed.spec.clone(), Some(true))
472            {
473                Ok(spec_id) => spec_id,
474                Err(err) => return ToolResult::err_fmt(err.to_string()),
475            };
476            let Some(entry_spec) = state
477                .snapshot
478                .monitors
479                .get(&spec_id)
480                .map(|entry| entry.status.spec.clone())
481            else {
482                return ToolResult::err_fmt("monitor registration failed");
483            };
484            entry_spec
485        };
486        if let Err(err) = self
487            .start_task(session_id, ctx.host.clone(), entry_spec)
488            .await
489        {
490            return ToolResult::err_fmt(err.to_string());
491        }
492        let state = match self.lock_state() {
493            Ok(state) => state,
494            Err(err) => return ToolResult::err_fmt(err.to_string()),
495        };
496        ToolResult::ok(serde_json::json!(Self::snapshot_from_state(&state)))
497    }
498
499    async fn handle_stop(&self, ctx: PluginActionContext, args: serde_json::Value) -> ToolResult {
500        let Some(session_id) = ctx.session_id.as_deref() else {
501            return ToolResult::err_fmt("monitor.stop requires a session");
502        };
503        let parsed = match serde_json::from_value::<StopMonitorArgs>(args) {
504            Ok(parsed) => parsed,
505            Err(err) => return ToolResult::err_fmt(format_args!("invalid stop payload: {err}")),
506        };
507        let (monitor_id, runtime_pid) = {
508            let state = match self.lock_state() {
509                Ok(state) => state,
510                Err(err) => return ToolResult::err_fmt(err.to_string()),
511            };
512            let Some(entry) = state.snapshot.monitors.get(&parsed.id) else {
513                return ToolResult::err_fmt(format_args!("unknown monitor `{}`", parsed.id));
514            };
515            (entry.status.spec.id.clone(), entry.runtime_pid)
516        };
517
518        if let Err(err) = terminate_monitor_process_tree(runtime_pid).await {
519            return ToolResult::err_fmt(err.to_string());
520        }
521
522        if let Err(err) = ctx
523            .host
524            .cancel_managed_task(session_id, &format!("monitor:{}", parsed.id))
525            .await
526        {
527            return ToolResult::err_fmt(err.to_string());
528        }
529
530        let mut state = match self.lock_state() {
531            Ok(state) => state,
532            Err(err) => return ToolResult::err_fmt(err.to_string()),
533        };
534        let Some(entry) = state.snapshot.monitors.get_mut(&parsed.id) else {
535            return ToolResult::err_fmt(format_args!("unknown monitor `{}`", parsed.id));
536        };
537        entry.status.armed = false;
538        entry.status.state = MonitorRunState::Stopped;
539        entry.pending_wake = false;
540        entry.runtime_pid = None;
541        entry.status.last_error = None;
542        entry.status.last_exit_status = None;
543        MonitorPlugin::queue_update(&mut state, &monitor_id, "Monitor stopped".to_string(), None);
544        ToolResult::ok(serde_json::json!(Self::snapshot_from_state(&state)))
545    }
546}
547
548#[async_trait]
549impl SessionPlugin for MonitorPlugin {
550    fn id(&self) -> &'static str {
551        MONITOR_PLUGIN_ID
552    }
553
554    fn register(&self, reg: &mut PluginRegistrar) -> Result<(), PluginError> {
555        let state = Arc::clone(&self.state);
556        reg.actions()
557            .typed::<MonitorRegisterSpecsOp, _, _>(move |ctx, args| {
558                let plugin = MonitorPlugin {
559                    state: Arc::clone(&state),
560                };
561                async move {
562                    tool_result_unit(
563                        plugin
564                            .handle_register_specs(
565                                ctx,
566                                serde_json::to_value(args).unwrap_or_default(),
567                            )
568                            .await,
569                    )
570                }
571            })?;
572
573        let state = Arc::clone(&self.state);
574        reg.actions()
575            .typed::<MonitorStatusOp, _, _>(move |ctx, _args| {
576                let plugin = MonitorPlugin {
577                    state: Arc::clone(&state),
578                };
579                async move { tool_result_output(plugin.handle_status(ctx).await) }
580            })?;
581
582        let state = Arc::clone(&self.state);
583        reg.actions()
584            .typed::<MonitorTakeUpdatesOp, _, _>(move |ctx, _args| {
585                let plugin = MonitorPlugin {
586                    state: Arc::clone(&state),
587                };
588                async move { tool_result_output(plugin.handle_take_updates(ctx).await) }
589            })?;
590
591        let state = Arc::clone(&self.state);
592        reg.actions()
593            .typed::<MonitorAckWakeOp, _, _>(move |_ctx, args| {
594                let plugin = MonitorPlugin {
595                    state: Arc::clone(&state),
596                };
597                async move {
598                    tool_result_unit(
599                        plugin
600                            .handle_ack_wake(serde_json::to_value(args).unwrap_or_default())
601                            .await,
602                    )
603                }
604            })?;
605
606        let state = Arc::clone(&self.state);
607        reg.actions()
608            .typed::<MonitorStartOp, _, _>(move |ctx, args| {
609                let plugin = MonitorPlugin {
610                    state: Arc::clone(&state),
611                };
612                async move {
613                    tool_result_output(
614                        plugin
615                            .handle_start(ctx, serde_json::to_value(args).unwrap_or_default())
616                            .await,
617                    )
618                }
619            })?;
620
621        let state = Arc::clone(&self.state);
622        reg.actions()
623            .typed::<MonitorStopOp, _, _>(move |ctx, args| {
624                let plugin = MonitorPlugin {
625                    state: Arc::clone(&state),
626                };
627                async move {
628                    tool_result_output(
629                        plugin
630                            .handle_stop(ctx, serde_json::to_value(args).unwrap_or_default())
631                            .await,
632                    )
633                }
634            })?;
635        Ok(())
636    }
637
638    fn snapshot(
639        &self,
640        _writer: &mut dyn SnapshotWriter,
641    ) -> Result<PluginSnapshotMeta, PluginError> {
642        let snapshot = self.lock_state()?.snapshot.clone();
643        Ok(PluginSnapshotMeta {
644            plugin_id: self.id().to_string(),
645            plugin_version: self.version().to_string(),
646            revision: snapshot.revision,
647            state: Some(serde_json::to_value(snapshot).map_err(|err| {
648                PluginError::Snapshot(format!("failed to serialize monitor snapshot: {err}"))
649            })?),
650        })
651    }
652
653    fn restore(
654        &self,
655        meta: &PluginSnapshotMeta,
656        _reader: &dyn SnapshotReader,
657    ) -> Result<(), PluginError> {
658        let snapshot = meta
659            .state
660            .clone()
661            .map(serde_json::from_value::<MonitorSnapshotState>)
662            .transpose()
663            .map_err(|err| PluginError::Snapshot(err.to_string()))?
664            .unwrap_or_default();
665        let mut state = self.lock_state()?;
666        state.snapshot = snapshot;
667        state.updates.clear();
668        for entry in state.snapshot.monitors.values_mut() {
669            entry.pending_wake = false;
670            entry.runtime_pid = None;
671            if entry.status.armed && entry.status.spec.restart_on_restore {
672                entry.status.state = MonitorRunState::Idle;
673            } else if entry.status.state == MonitorRunState::Running {
674                entry.status.state = MonitorRunState::Idle;
675                entry.status.armed = false;
676            }
677        }
678        Ok(())
679    }
680
681    fn snapshot_revision(&self) -> u64 {
682        self.state
683            .lock()
684            .map(|state| state.snapshot.revision)
685            .unwrap_or_default()
686    }
687}
688
689async fn run_monitor_task(
690    state: Arc<Mutex<MonitorPluginState>>,
691    _session_id: String,
692    spec: MonitorSpec,
693    _host: Arc<dyn crate::plugin::runtime_host::TaskHost>,
694) -> Result<(), PluginError> {
695    let timeout_deadline = (!spec.persistent)
696        .then(|| tokio::time::Instant::now() + std::time::Duration::from_millis(spec.timeout_ms));
697    let mut command = Command::new("bash");
698    command.arg("-lc").arg(&spec.command);
699    if let Some(cwd) = spec.cwd.as_ref() {
700        command.current_dir(cwd);
701    }
702    if !spec.env.is_empty() {
703        command.envs(spec.env.iter());
704    }
705    command.kill_on_drop(true);
706    command.stdout(std::process::Stdio::piped());
707    command.stderr(std::process::Stdio::piped());
708    configure_monitor_command(&mut command);
709
710    let mut child = command
711        .spawn()
712        .map_err(|err| PluginError::Session(format!("failed to start monitor process: {err}")))?;
713    let runtime_pid = child.id();
714    {
715        let mut guard = state
716            .lock()
717            .map_err(|_| PluginError::Session("monitor state poisoned".to_string()))?;
718        if let Some(entry) = guard.snapshot.monitors.get_mut(&spec.id) {
719            entry.runtime_pid = runtime_pid;
720        }
721    }
722    let stdout = child
723        .stdout
724        .take()
725        .ok_or_else(|| PluginError::Session("monitor stdout unavailable".to_string()))?;
726    let stderr = child
727        .stderr
728        .take()
729        .ok_or_else(|| PluginError::Session("monitor stderr unavailable".to_string()))?;
730    let mut stdout_lines = BufReader::new(stdout).lines();
731    let mut stderr_lines = BufReader::new(stderr).lines();
732    let mut stdout_done = false;
733    let mut stderr_done = false;
734    let mut timeout = timeout_deadline.map(|deadline| Box::pin(tokio::time::sleep_until(deadline)));
735    let mut timed_out = false;
736
737    let id = spec.id.clone();
738    let wake_policy = spec.wake_policy;
739
740    while !stdout_done || !stderr_done {
741        select! {
742            _ = timeout.as_mut().unwrap(), if timeout.is_some() => {
743                timed_out = true;
744                break;
745            }
746            line = stdout_lines.next_line(), if !stdout_done => {
747                match line.map_err(|err| PluginError::Session(format!("monitor stdout read failed: {err}")))? {
748                    Some(line) => record_monitor_line(&state, &spec, &id, wake_policy, line, true)?,
749                    None => stdout_done = true,
750                }
751            }
752            line = stderr_lines.next_line(), if !stderr_done => {
753                match line.map_err(|err| PluginError::Session(format!("monitor stderr read failed: {err}")))? {
754                    Some(line) => record_monitor_line(
755                        &state,
756                        &spec,
757                        &id,
758                        MonitorWakePolicy::Notify,
759                        line,
760                        false,
761                    )?,
762                    None => stderr_done = true,
763                }
764            }
765        }
766    }
767
768    let exit =
769        if timed_out {
770            terminate_monitor_process_tree(runtime_pid).await?;
771            child
772                .wait()
773                .await
774                .map_err(|err| PluginError::Session(format!("monitor wait failed: {err}")))?
775        } else if let Some(deadline) = timeout_deadline {
776            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
777            match tokio::time::timeout(remaining, child.wait()).await {
778                Ok(result) => result
779                    .map_err(|err| PluginError::Session(format!("monitor wait failed: {err}")))?,
780                Err(_) => {
781                    timed_out = true;
782                    terminate_monitor_process_tree(runtime_pid).await?;
783                    child.wait().await.map_err(|err| {
784                        PluginError::Session(format!("monitor wait failed: {err}"))
785                    })?
786                }
787            }
788        } else {
789            child
790                .wait()
791                .await
792                .map_err(|err| PluginError::Session(format!("monitor wait failed: {err}")))?
793        };
794
795    let mut state = state
796        .lock()
797        .map_err(|_| PluginError::Session("monitor state poisoned".to_string()))?;
798    if let Some(entry) = state.snapshot.monitors.get_mut(&id) {
799        let was_stopped = entry.status.state == MonitorRunState::Stopped;
800        entry.status.state = if was_stopped {
801            MonitorRunState::Stopped
802        } else if timed_out {
803            MonitorRunState::Failed
804        } else if exit.success() {
805            MonitorRunState::Exited
806        } else {
807            MonitorRunState::Failed
808        };
809        entry.status.last_exit_status = exit.code();
810        entry.status.armed = false;
811        entry.runtime_pid = None;
812        if timed_out {
813            entry.status.last_error =
814                Some(format!("monitor timed out after {}ms", spec.timeout_ms));
815        } else if !exit.success() && !was_stopped {
816            entry.status.last_error = Some(format!(
817                "monitor exited with status {}",
818                exit.code().unwrap_or_default()
819            ));
820        }
821        entry.pending_wake = false;
822    }
823    if state
824        .snapshot
825        .monitors
826        .get(&id)
827        .map(|entry| entry.status.state != MonitorRunState::Stopped)
828        .unwrap_or(false)
829    {
830        MonitorPlugin::queue_update(
831            &mut state,
832            &id,
833            if timed_out {
834                format!("Monitor timed out after {}ms", spec.timeout_ms)
835            } else if exit.success() {
836                "Monitor exited".to_string()
837            } else {
838                format!(
839                    "Monitor failed with status {}",
840                    exit.code().unwrap_or_default()
841                )
842            },
843            None,
844        );
845    }
846    Ok(())
847}
848
849#[cfg(unix)]
850fn configure_monitor_command(command: &mut Command) {
851    // Put each monitor under its own session/process group so stop can
852    // terminate the whole command tree instead of just the shell wrapper.
853    unsafe {
854        command.pre_exec(|| {
855            if libc::setsid() == -1 {
856                return Err(std::io::Error::last_os_error());
857            }
858            Ok(())
859        });
860    }
861}
862
863#[cfg(not(unix))]
864fn configure_monitor_command(_command: &mut Command) {}
865
866#[cfg(unix)]
867async fn terminate_monitor_process_tree(runtime_pid: Option<u32>) -> Result<(), PluginError> {
868    let Some(pid) = runtime_pid else {
869        return Ok(());
870    };
871    let pgid = -(pid as i32);
872    send_process_group_signal(pgid, libc::SIGTERM)?;
873    tokio::time::sleep(std::time::Duration::from_millis(200)).await;
874    if process_group_exists(pgid) {
875        send_process_group_signal(pgid, libc::SIGKILL)?;
876    }
877    Ok(())
878}
879
880#[cfg(not(unix))]
881async fn terminate_monitor_process_tree(_runtime_pid: Option<u32>) -> Result<(), PluginError> {
882    Ok(())
883}
884
885#[cfg(unix)]
886fn process_group_exists(pgid: i32) -> bool {
887    // `kill(pgid, 0)` probes whether any process in the group still exists.
888    let rc = unsafe { libc::kill(pgid, 0) };
889    if rc == 0 {
890        return true;
891    }
892    let err = std::io::Error::last_os_error();
893    !matches!(err.raw_os_error(), Some(libc::ESRCH))
894}
895
896#[cfg(unix)]
897fn send_process_group_signal(pgid: i32, signal: libc::c_int) -> Result<(), PluginError> {
898    let rc = unsafe { libc::kill(pgid, signal) };
899    if rc == 0 {
900        return Ok(());
901    }
902    let err = std::io::Error::last_os_error();
903    if matches!(err.raw_os_error(), Some(libc::ESRCH)) {
904        return Ok(());
905    }
906    Err(PluginError::Session(format!(
907        "failed to signal monitor process group {pgid}: {err}"
908    )))
909}
910
911fn record_monitor_line(
912    state: &Arc<Mutex<MonitorPluginState>>,
913    spec: &MonitorSpec,
914    id: &str,
915    wake_policy: MonitorWakePolicy,
916    line: String,
917    from_stdout: bool,
918) -> Result<(), PluginError> {
919    let message = line.trim().to_string();
920    if message.is_empty() {
921        return Ok(());
922    }
923    let mut state = state
924        .lock()
925        .map_err(|_| PluginError::Session("monitor state poisoned".to_string()))?;
926    let Some(entry) = state.snapshot.monitors.get_mut(id) else {
927        return Ok(());
928    };
929    entry.status.last_event = Some(message.clone());
930    entry.status.event_count = entry.status.event_count.saturating_add(1);
931    let queue_turn_input =
932        if from_stdout && wake_policy == MonitorWakePolicy::QueueTurn && !entry.pending_wake {
933            entry.pending_wake = true;
934            Some(format!("Monitor event \"{}\": {}", spec.id, message))
935        } else {
936            None
937        };
938    MonitorPlugin::queue_update(&mut state, id, message, queue_turn_input);
939    Ok(())
940}
941
942#[cfg(test)]
943mod tests {
944    use super::*;
945    use crate::testing::MockSessionManager;
946
947    fn seeded_monitor_state(spec: &MonitorSpec) -> Arc<Mutex<MonitorPluginState>> {
948        let mut monitors = BTreeMap::new();
949        monitors.insert(
950            spec.id.clone(),
951            MonitorEntry {
952                owner_plugin_id: None,
953                status: MonitorStatus {
954                    spec: spec.clone(),
955                    armed: true,
956                    state: MonitorRunState::Running,
957                    last_event: None,
958                    last_error: None,
959                    last_exit_status: None,
960                    event_count: 0,
961                },
962                pending_wake: false,
963                runtime_pid: None,
964            },
965        );
966        Arc::new(Mutex::new(MonitorPluginState {
967            snapshot: MonitorSnapshotState {
968                revision: 0,
969                sequence: 0,
970                monitors,
971            },
972            updates: Vec::new(),
973        }))
974    }
975
976    #[tokio::test]
977    async fn non_persistent_monitor_times_out_and_records_failure() {
978        let spec = MonitorSpec {
979            id: "slow".to_string(),
980            command: "sleep 5".to_string(),
981            persistent: false,
982            timeout_ms: 50,
983            ..Default::default()
984        };
985        let state = seeded_monitor_state(&spec);
986        let host: Arc<dyn crate::plugin::runtime_host::RuntimeSessionHost> =
987            Arc::new(MockSessionManager::default());
988
989        run_monitor_task(state.clone(), "root".to_string(), spec, host)
990            .await
991            .expect("monitor task should complete after timeout");
992
993        let guard = state.lock().expect("monitor state");
994        let entry = guard
995            .snapshot
996            .monitors
997            .get("slow")
998            .expect("seeded monitor entry");
999        assert_eq!(entry.status.state, MonitorRunState::Failed);
1000        assert!(!entry.status.armed);
1001        assert!(
1002            entry
1003                .status
1004                .last_error
1005                .as_deref()
1006                .is_some_and(|error| error.contains("timed out after 50ms"))
1007        );
1008        assert!(
1009            guard
1010                .updates
1011                .iter()
1012                .any(|event| event.message.contains("timed out after 50ms"))
1013        );
1014    }
1015}