use std::sync::Arc;
use crate::session_task::{
SessionTaskState, SessionTaskUpdate, TASK_KIND_MONITOR, TaskExecutor, TaskExecutorPlugin,
};
pub struct MonitorTaskExecutor;
#[async_trait::async_trait]
impl TaskExecutor for MonitorTaskExecutor {
fn kind(&self) -> &str {
TASK_KIND_MONITOR
}
async fn cancel(
&self,
task: &crate::session_task::SessionTask,
context: &crate::traits::ToolContext,
) -> crate::error::Result<()> {
if let Some(schedule_id_str) = task.spec.get("schedule_id").and_then(|v| v.as_str()) {
match crate::typed_id::ScheduleId::parse(schedule_id_str) {
Ok(schedule_id) => {
if let Some(ref store) = context.schedule_store
&& let Err(e) = store.cancel_schedule(task.session_id, schedule_id).await
{
tracing::warn!(
task_id = %task.id,
schedule_id = %schedule_id_str,
error = %e,
"MonitorTaskExecutor: cancel_schedule failed (best-effort)"
);
}
}
Err(e) => {
tracing::warn!(
task_id = %task.id,
raw = %schedule_id_str,
error = %e,
"MonitorTaskExecutor: could not parse schedule_id from spec"
);
}
}
}
if let Some(ref registry) = context.session_task_registry
&& let Err(e) = registry
.update(
task.session_id,
&task.id,
SessionTaskUpdate {
state: Some(SessionTaskState::Canceled),
summary: Some("Monitor canceled".into()),
..Default::default()
},
)
.await
{
tracing::warn!(
task_id = %task.id,
error = %e,
"MonitorTaskExecutor: failed to update task to Canceled"
);
}
Ok(())
}
}
inventory::submit! {
TaskExecutorPlugin {
executor: || Arc::new(MonitorTaskExecutor),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::capabilities::session_tasks::tests::InMemorySessionTaskRegistry;
use crate::session_schedule::SessionSchedule;
use crate::session_task::{
CreateSessionTask, SessionTaskRegistry, SessionTaskState, TaskLinks, TaskWakePolicy,
};
use crate::traits::{SessionScheduleStore, ToolContext};
use crate::typed_id::{ScheduleId, SessionId};
use std::sync::{Arc, Mutex};
#[derive(Default)]
struct StubScheduleStore {
canceled: Mutex<Vec<ScheduleId>>,
}
#[async_trait::async_trait]
impl SessionScheduleStore for StubScheduleStore {
async fn create_schedule(
&self,
session_id: SessionId,
description: String,
cron_expression: Option<String>,
scheduled_at: Option<chrono::DateTime<chrono::Utc>>,
timezone: String,
) -> crate::error::Result<SessionSchedule> {
let _ = (
session_id,
description,
cron_expression,
scheduled_at,
timezone,
);
unimplemented!("not needed by cancel test")
}
async fn cancel_schedule(
&self,
session_id: SessionId,
schedule_id: ScheduleId,
) -> crate::error::Result<SessionSchedule> {
self.canceled.lock().unwrap().push(schedule_id);
Ok(SessionSchedule {
id: schedule_id,
session_id,
owner_principal_id: crate::typed_id::PrincipalId::new(),
resolved_owner_user_id: None,
owner: None,
effective_owner: None,
description: String::new(),
cron_expression: None,
scheduled_at: None,
timezone: "UTC".into(),
enabled: false,
schedule_type: crate::session_schedule::ScheduleType::OneShot,
next_trigger_at: None,
last_triggered_at: None,
trigger_count: 0,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
})
}
async fn count_active_schedules(
&self,
_session_id: SessionId,
) -> crate::error::Result<u32> {
Ok(0)
}
async fn list_schedules(
&self,
_session_id: SessionId,
) -> crate::error::Result<Vec<SessionSchedule>> {
Ok(vec![])
}
}
#[tokio::test]
async fn cancel_reads_schedule_id_and_transitions_to_canceled() {
let session_id = SessionId::new();
let schedule_id = ScheduleId::new();
let registry = Arc::new(InMemorySessionTaskRegistry::default());
let schedule_store = Arc::new(StubScheduleStore::default());
let task = registry
.create(CreateSessionTask {
session_id,
id: None,
kind: TASK_KIND_MONITOR.to_string(),
display_name: "Test Monitor".to_string(),
spec: serde_json::json!({
"schedule_id": schedule_id.to_string(),
}),
state: SessionTaskState::Running,
links: TaskLinks::default(),
wake_policy: TaskWakePolicy::Silent,
})
.await
.unwrap();
let context = ToolContext::new(session_id)
.with_session_task_registry(registry.clone())
.with_schedule_store(schedule_store.clone());
let executor = MonitorTaskExecutor;
executor.cancel(&task, &context).await.unwrap();
{
let canceled = schedule_store.canceled.lock().unwrap();
assert_eq!(canceled.len(), 1);
assert_eq!(canceled[0], schedule_id);
}
let updated = registry
.get(session_id, &task.id)
.await
.unwrap()
.expect("task should exist");
assert_eq!(updated.state, SessionTaskState::Canceled);
assert_eq!(updated.summary.as_deref(), Some("Monitor canceled"));
}
#[tokio::test]
async fn cancel_succeeds_without_schedule_store() {
let session_id = SessionId::new();
let schedule_id = ScheduleId::new();
let registry = Arc::new(InMemorySessionTaskRegistry::default());
let task = registry
.create(CreateSessionTask {
session_id,
id: None,
kind: TASK_KIND_MONITOR.to_string(),
display_name: "Test Monitor".to_string(),
spec: serde_json::json!({ "schedule_id": schedule_id.to_string() }),
state: SessionTaskState::Running,
links: TaskLinks::default(),
wake_policy: TaskWakePolicy::Silent,
})
.await
.unwrap();
let context = ToolContext::new(session_id).with_session_task_registry(registry.clone());
MonitorTaskExecutor.cancel(&task, &context).await.unwrap();
let updated = registry
.get(session_id, &task.id)
.await
.unwrap()
.expect("task should exist");
assert_eq!(updated.state, SessionTaskState::Canceled);
}
}