harn-cli 0.8.4

CLI for the Harn programming language — run, test, REPL, format, and lint
Documentation
use std::collections::BTreeMap;
use std::path::Path;

use serde::{Deserialize, Serialize};
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;

use super::errors::OrchestratorError;

pub(crate) const SUPERVISOR_STATE_FILE: &str = "workflow-supervisor-state.json";

#[derive(Clone, Debug, Default, Serialize, Deserialize)]
#[serde(default)]
pub(crate) struct WorkflowSupervisorState {
    pub schema_version: u32,
    pub process: Option<WorkflowSupervisorProcess>,
    pub workflows: BTreeMap<String, WorkflowSupervisorWorkflow>,
    pub updated_at: Option<String>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct WorkflowSupervisorProcess {
    pub pid: u32,
    pub status: String,
    pub config_path: String,
    pub state_dir: String,
    pub bind: String,
    pub log_path: String,
    pub started_at: String,
    pub stopped_at: Option<String>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct WorkflowSupervisorWorkflow {
    pub workflow_id: String,
    pub status: String,
    pub reason: Option<String>,
    pub notification_hint: WorkflowSupervisorNotificationHint,
    pub updated_at: String,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct WorkflowSupervisorNotificationHint {
    pub kind: String,
    pub workflow_id: String,
    pub config_path: Option<String>,
    pub state_dir: String,
    pub resume_command: String,
    pub inspect_command: String,
}

impl WorkflowSupervisorState {
    pub(crate) fn normalized(mut self) -> Self {
        if self.schema_version == 0 {
            self.schema_version = 1;
        }
        self
    }
}

pub(crate) fn supervisor_state_path(state_dir: &Path) -> std::path::PathBuf {
    state_dir.join(SUPERVISOR_STATE_FILE)
}

pub(crate) fn read_supervisor_state(
    state_dir: &Path,
) -> Result<WorkflowSupervisorState, OrchestratorError> {
    let path = supervisor_state_path(state_dir);
    if !path.is_file() {
        return Ok(WorkflowSupervisorState {
            schema_version: 1,
            ..WorkflowSupervisorState::default()
        });
    }
    let content = std::fs::read_to_string(&path)
        .map_err(|error| format!("failed to read {}: {error}", path.display()))?;
    serde_json::from_str::<WorkflowSupervisorState>(&content)
        .map(WorkflowSupervisorState::normalized)
        .map_err(|error| format!("failed to parse {}: {error}", path.display()).into())
}

pub(crate) fn write_supervisor_state(
    state_dir: &Path,
    state: &WorkflowSupervisorState,
) -> Result<(), OrchestratorError> {
    std::fs::create_dir_all(state_dir).map_err(|error| {
        format!(
            "failed to create supervisor state dir {}: {error}",
            state_dir.display()
        )
    })?;
    let path = supervisor_state_path(state_dir);
    let encoded = serde_json::to_string_pretty(state)
        .map_err(|error| format!("failed to encode supervisor state: {error}"))?;
    std::fs::write(&path, encoded)
        .map_err(|error| format!("failed to write {}: {error}", path.display()).into())
}

pub(crate) async fn apply_supervisor_state(
    state_dir: &Path,
) -> Result<WorkflowSupervisorState, OrchestratorError> {
    let state = read_supervisor_state(state_dir)?;
    for (workflow_id, workflow) in &state.workflows {
        match workflow.status.as_str() {
            "paused" => {
                harn_vm::pause(workflow_id).await.map_err(|error| {
                    format!("failed to pause workflow '{workflow_id}': {error}")
                })?;
            }
            "running" => {
                harn_vm::resume(workflow_id).await.map_err(|error| {
                    format!("failed to resume workflow '{workflow_id}': {error}")
                })?;
            }
            _ => {}
        }
    }
    Ok(state)
}

pub(crate) fn set_workflow_status(
    state: &mut WorkflowSupervisorState,
    config_path: Option<&Path>,
    state_dir: &Path,
    workflow_id: &str,
    status: &str,
    reason: Option<String>,
) -> Result<WorkflowSupervisorWorkflow, OrchestratorError> {
    let updated_at = now_rfc3339()?;
    let workflow = WorkflowSupervisorWorkflow {
        workflow_id: workflow_id.to_string(),
        status: status.to_string(),
        reason,
        notification_hint: workflow_notification_hint(config_path, state_dir, workflow_id),
        updated_at: updated_at.clone(),
    };
    state.schema_version = 1;
    state.updated_at = Some(updated_at);
    state
        .workflows
        .insert(workflow_id.to_string(), workflow.clone());
    Ok(workflow)
}

pub(crate) fn workflow_override<'a>(
    state: &'a WorkflowSupervisorState,
    workflow_id: &str,
) -> Option<&'a WorkflowSupervisorWorkflow> {
    state.workflows.get(workflow_id)
}

pub(crate) fn now_rfc3339() -> Result<String, OrchestratorError> {
    OffsetDateTime::now_utc()
        .format(&Rfc3339)
        .map_err(|error| error.to_string().into())
}

pub(crate) fn workflow_notification_hint(
    config_path: Option<&Path>,
    state_dir: &Path,
    workflow_id: &str,
) -> WorkflowSupervisorNotificationHint {
    let config_path_string = config_path.map(|path| path.display().to_string());
    let config_arg = config_path
        .map(|path| format!(" --config {}", path.display()))
        .unwrap_or_default();
    let state_dir = state_dir.display();
    WorkflowSupervisorNotificationHint {
        kind: "burin.resume_workflow".to_string(),
        workflow_id: workflow_id.to_string(),
        config_path: config_path_string,
        state_dir: state_dir.to_string(),
        resume_command: format!(
            "harn supervisor resume {workflow_id}{config_arg} --state-dir {state_dir} --json"
        ),
        inspect_command: format!(
            "harn supervisor inspect {workflow_id}{config_arg} --state-dir {state_dir} --json"
        ),
    }
}