roder-tasks 0.1.1

Agentic software development tools and SDKs for Roder.
Documentation
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use roder_api::processes::{ProcessDescriptor, ProcessOrigin, ProcessState, ProcessStopper};
use roder_api::tasks::TaskOutputStream;
use roder_tasks::{ProcessRegistry, ProcessRegistryConfig};
use time::OffsetDateTime;

struct FlagStopper(Arc<AtomicBool>);

#[async_trait::async_trait]
impl ProcessStopper for FlagStopper {
    async fn stop(&self, _reason: Option<String>) -> anyhow::Result<()> {
        self.0.store(true, Ordering::SeqCst);
        Ok(())
    }
}

fn descriptor(process_id: &str, task_id: Option<&str>) -> ProcessDescriptor {
    ProcessDescriptor {
        process_id: process_id.to_string(),
        origin: ProcessOrigin::BackgroundTask,
        state: ProcessState::Running,
        command: vec!["sleep".to_string(), "10".to_string()],
        command_summary: "sleep 10".to_string(),
        cwd: Some("/tmp".to_string()),
        pid: Some(1234),
        task_id: task_id.map(str::to_string),
        thread_id: Some("thread-1".to_string()),
        turn_id: Some("turn-1".to_string()),
        runner_destination_id: None,
        runner_session_id: None,
        stoppable: true,
        started_at: OffsetDateTime::UNIX_EPOCH,
        updated_at: OffsetDateTime::UNIX_EPOCH,
        stdout_tail: None,
        stderr_tail: None,
    }
}

#[tokio::test]
async fn process_registry_registers_lists_outputs_and_marks_exit() {
    let registry = ProcessRegistry::default();
    let mut events = registry.subscribe();
    registry
        .register(descriptor("process-1", Some("task-1")), None)
        .await
        .unwrap();

    assert_eq!(registry.list(false).await.len(), 1);
    registry
        .append_task_output(
            "task-1",
            TaskOutputStream::Stdout,
            "ready\n".to_string(),
            0,
            Some("thread-1".to_string()),
            Some("turn-1".to_string()),
        )
        .await
        .unwrap();
    assert_eq!(registry.output("process-1").await.len(), 1);
    assert_eq!(
        registry
            .get("process-1")
            .await
            .unwrap()
            .stdout_tail
            .as_deref(),
        Some("ready\n")
    );

    registry.mark_exited("process-1", Some(0)).await.unwrap();
    assert!(registry.list(false).await.is_empty());
    assert_eq!(registry.list(true).await.len(), 1);

    let mut kinds = Vec::new();
    for _ in 0..3 {
        kinds.push(events.recv().await.unwrap().kind().to_string());
    }
    assert_eq!(
        kinds,
        vec!["process.started", "process.output", "process.exited"]
    );
}

#[tokio::test]
async fn process_registry_stop_is_idempotent_and_uses_stopper() {
    let registry = ProcessRegistry::default();
    let stopped = Arc::new(AtomicBool::new(false));
    registry
        .register(
            descriptor("process-1", None),
            Some(Arc::new(FlagStopper(Arc::clone(&stopped)))),
        )
        .await
        .unwrap();

    let result = registry
        .stop("process-1", Some("test".to_string()))
        .await
        .unwrap();
    assert!(result.stopped);
    assert!(stopped.load(Ordering::SeqCst));

    registry
        .mark_stopped("process-1", Some("test".to_string()))
        .await
        .unwrap();
    let result = registry.stop("process-1", None).await.unwrap();
    assert!(!result.stopped);
}

#[tokio::test]
async fn process_registry_retains_only_recent_completed_descriptors() {
    let registry = ProcessRegistry::new(ProcessRegistryConfig {
        max_completed: 1,
        max_output_bytes: 1024,
    });
    registry
        .register(descriptor("old", None), None)
        .await
        .unwrap();
    registry.mark_exited("old", Some(0)).await.unwrap();
    registry
        .register(descriptor("new", None), None)
        .await
        .unwrap();
    registry.mark_exited("new", Some(0)).await.unwrap();

    let listed = registry.list(true).await;
    assert_eq!(listed.len(), 1);
    assert_eq!(listed[0].process_id, "new");
}