opensymphony 1.3.2

A Rust implementation of the OpenAI Symphony orchestration design
Documentation
pub(crate) mod backends;
mod config;
mod snapshot;

use std::{collections::VecDeque, path::PathBuf, process::ExitCode, sync::Arc};

use crate::opensymphony_control::{ControlPlaneServer, RecentEventKind, SnapshotStore};
use crate::opensymphony_domain::TimestampMs;
use crate::opensymphony_linear::LinearError;
use crate::opensymphony_openhands::OpenHandsError;
use crate::opensymphony_orchestrator::{Scheduler, SchedulerConfig, SchedulerError};
use crate::opensymphony_workspace::WorkspaceError;
use chrono::{DateTime, Utc};
use clap::Args;
use thiserror::Error;
use tokio::{
    net::TcpListener,
    time::{MissedTickBehavior, interval},
};
use tracing::{info, warn};

use self::{
    backends::{
        RuntimeWorkerBackend, RuntimeWorkspaceBackend, build_runtime_transport,
        build_tracker_backend, build_workspace_manager_config,
    },
    config::resolve_runtime_config,
    snapshot::{current_agent_server_status, map_snapshot, push_recent_event, terminal_state_set},
};

#[derive(Debug, Args, Clone)]
pub struct RunArgs {
    #[arg(help = "Runtime config YAML path; defaults to ./config.yaml when present")]
    #[arg(long)]
    pub config: Option<PathBuf>,
}

#[derive(Debug, Error)]
enum RunCommandError {
    #[error("failed to determine the current working directory: {0}")]
    CurrentDir(#[source] std::io::Error),
    #[error("failed to read {path}: {source}")]
    ReadConfig {
        path: PathBuf,
        #[source]
        source: std::io::Error,
    },
    #[error("failed to parse {path}: {source}")]
    ParseConfig {
        path: PathBuf,
        #[source]
        source: serde_yaml::Error,
    },
    #[error("failed to expand {path}: {detail}")]
    ResolveConfig { path: PathBuf, detail: String },
    #[error("invalid control-plane bind address `{value}`: {source}")]
    InvalidBind {
        value: String,
        #[source]
        source: std::net::AddrParseError,
    },
    #[error("failed to load workflow {path}: {source}")]
    LoadWorkflow {
        path: PathBuf,
        #[source]
        source: crate::opensymphony_workflow::WorkflowLoadError,
    },
    #[error("failed to resolve workflow {path}: {source}")]
    ResolveWorkflow {
        path: PathBuf,
        #[source]
        source: crate::opensymphony_workflow::WorkflowConfigError,
    },
    #[error("failed to build tracker client: {0}")]
    Tracker(#[from] LinearError),
    #[error("failed to create workspace manager: {0}")]
    WorkspaceManager(#[from] WorkspaceError),
    #[error("failed to prepare OpenHands transport: {0}")]
    Transport(#[from] OpenHandsError),
    #[error(
        "managed local OpenHands tooling at {tool_dir} is missing or invalid: {detail}. Run `opensymphony install openhands` or `opensymphony doctor --config <path>`."
    )]
    ToolingSetupRequired { tool_dir: PathBuf, detail: String },
    #[error("failed to start local OpenHands supervisor: {0}")]
    Supervisor(#[from] crate::opensymphony_openhands::SupervisorError),
    #[error("failed to build scheduler configuration: {0}")]
    SchedulerConfig(#[from] SchedulerError),
    #[error("failed to bind control-plane listener: {0}")]
    BindListener(#[source] std::io::Error),
    #[error("control-plane server exited unexpectedly: {0}")]
    Serve(#[source] std::io::Error),
    #[error(
        "workflow config requires a managed local OpenHands server, but `openhands.tool_dir` is missing from config.yaml (recommended: ~/.opensymphony/openhands-server)"
    )]
    MissingToolDir,
    #[error(
        "OpenHands transport URL `{value}` does not include an explicit port and has no default port"
    )]
    MissingTransportPort { value: String },
}

pub async fn run_command(args: RunArgs) -> ExitCode {
    match run_orchestrator(args).await {
        Ok(()) => ExitCode::SUCCESS,
        Err(error) => {
            eprintln!("{error}");
            ExitCode::from(1)
        }
    }
}

async fn run_orchestrator(args: RunArgs) -> Result<(), RunCommandError> {
    let runtime = resolve_runtime_config(&args).await?;
    info!(
        config = runtime
            .config_path
            .as_ref()
            .map(|path| path.display().to_string())
            .unwrap_or_else(|| "<none>".to_string()),
        target_repo = %runtime.target_repo.display(),
        workflow = %runtime.workflow_path.display(),
        bind = %runtime.bind,
        "starting OpenSymphony orchestrator"
    );

    let tracker = build_tracker_backend(&runtime.workflow)?;
    let workspace_manager = Arc::new(crate::opensymphony_workspace::WorkspaceManager::new(
        build_workspace_manager_config(&runtime.workflow),
    )?);
    let workspace = RuntimeWorkspaceBackend::new(workspace_manager.clone(), &runtime.workflow);

    let (transport, mut supervisor) = build_runtime_transport(&runtime).await?;
    let client = crate::opensymphony_openhands::OpenHandsClient::new(transport);
    client.openapi_probe().await?;

    let worker = RuntimeWorkerBackend::new(
        client.clone(),
        Arc::new(runtime.workflow.clone()),
        workspace_manager,
    );
    let mut scheduler = Scheduler::new(
        tracker,
        workspace,
        worker,
        SchedulerConfig::from_workflow(&runtime.workflow)?,
    );

    let mut recent_events = VecDeque::new();
    push_recent_event(
        &mut recent_events,
        RecentEventKind::SnapshotPublished,
        None,
        format!("loaded {}", runtime.workflow_path.display()),
        Utc::now(),
    );

    let initial_snapshot = map_snapshot(
        &scheduler.snapshot(now_timestamp()),
        runtime.workflow.config.workspace.root.as_path(),
        &terminal_state_set(&runtime.workflow),
        current_agent_server_status(&mut supervisor, client.base_url()),
        &recent_events,
    );

    let store = SnapshotStore::new(initial_snapshot);
    let listener = TcpListener::bind(runtime.bind)
        .await
        .map_err(RunCommandError::BindListener)?;
    let server = ControlPlaneServer::new(store.clone());
    let mut server_task = tokio::spawn(async move { server.serve(listener).await });

    let bootstrap_snapshot = scheduler.bootstrap(now_timestamp()).await?;
    push_recent_event(
        &mut recent_events,
        RecentEventKind::SnapshotPublished,
        None,
        format!(
            "recovered startup state; running={}, retry_queue={}",
            bootstrap_snapshot.daemon.running_issue_count,
            bootstrap_snapshot.daemon.retry_queue_count
        ),
        Utc::now(),
    );
    store
        .publish(map_snapshot(
            &bootstrap_snapshot,
            runtime.workflow.config.workspace.root.as_path(),
            &terminal_state_set(&runtime.workflow),
            current_agent_server_status(&mut supervisor, client.base_url()),
            &recent_events,
        ))
        .await;

    let poll_interval =
        std::time::Duration::from_millis(runtime.workflow.config.polling.interval_ms);
    let mut ticker = interval(poll_interval);
    ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);

    loop {
        tokio::select! {
            _ = tokio::signal::ctrl_c() => {
                info!("received shutdown signal");
                break;
            }
            result = &mut server_task => {
                match result {
                    Ok(Ok(())) => break,
                    Ok(Err(error)) => return Err(RunCommandError::Serve(error)),
                    Err(error) => return Err(RunCommandError::Serve(std::io::Error::other(error.to_string()))),
                }
            }
            _ = ticker.tick() => {
                let observed_at = now_timestamp();
                match scheduler.tick(observed_at).await {
                    Ok(snapshot) => {
                        push_recent_event(
                            &mut recent_events,
                            RecentEventKind::SnapshotPublished,
                            None,
                            format!(
                                "polled tracker; running={}, retry_queue={}",
                                snapshot.daemon.running_issue_count,
                                snapshot.daemon.retry_queue_count
                            ),
                            Utc::now(),
                        );
                        store.publish(map_snapshot(
                            &snapshot,
                            runtime.workflow.config.workspace.root.as_path(),
                            &terminal_state_set(&runtime.workflow),
                            current_agent_server_status(&mut supervisor, client.base_url()),
                            &recent_events,
                        )).await;
                    }
                    Err(error) => {
                        warn!(%error, "scheduler tick failed");
                        push_recent_event(
                            &mut recent_events,
                            RecentEventKind::Warning,
                            None,
                            format!("scheduler tick failed: {error}"),
                            Utc::now(),
                        );
                        let snapshot = scheduler.snapshot(observed_at);
                        store.publish(map_snapshot(
                            &snapshot,
                            runtime.workflow.config.workspace.root.as_path(),
                            &terminal_state_set(&runtime.workflow),
                            current_agent_server_status(&mut supervisor, client.base_url()),
                            &recent_events,
                        )).await;
                    }
                }
            }
        }
    }

    if let Some(mut supervisor) = supervisor {
        let _ = supervisor.stop();
    }

    Ok(())
}

pub(super) fn timestamp_to_datetime(value: TimestampMs) -> DateTime<Utc> {
    DateTime::from_timestamp_millis(value.as_u64() as i64).unwrap_or_else(Utc::now)
}

pub(super) fn datetime_to_timestamp_ms(value: DateTime<Utc>) -> TimestampMs {
    TimestampMs::new(value.timestamp_millis().max(0) as u64)
}

pub(super) fn now_timestamp() -> TimestampMs {
    TimestampMs::new(Utc::now().timestamp_millis().max(0) as u64)
}