harn-cli 0.8.8

CLI for the Harn programming language — run, test, REPL, format, and lint
Documentation
use std::collections::BTreeMap;
use std::path::Path;
use std::sync::{Arc, Mutex};

use serde_json::Value as JsonValue;
use tokio::sync::broadcast;

use crate::cli::{McpServeArgs, OrchestratorLocalArgs};
use crate::commands::orchestrator::listener::ListenerAuth;

use super::super::oauth_resource::OAuthResourceServer;
use super::types::{
    AbortOnDrop, LogWatcherReadiness, McpListChangeKind, McpLogNotification,
    McpOrchestratorService, McpResourceNotification, McpTaskNotification,
};
use super::watchers::{spawn_log_topic_watchers, start_list_change_watcher};
use harn_serve::FilePromptCatalog;

use super::LOG_NOTIFICATION_CAPACITY;

impl McpOrchestratorService {
    pub(super) fn new(args: &McpServeArgs) -> Result<Self, String> {
        Self::new_local(args.local.clone())
    }

    pub(crate) fn new_local(local: OrchestratorLocalArgs) -> Result<Self, String> {
        let manifest_source = std::fs::read_to_string(&local.config).map_err(|error| {
            format!(
                "failed to read manifest {}: {error}",
                local.config.display()
            )
        })?;
        let auth = ListenerAuth::from_env(false, None)?;
        let oauth = OAuthResourceServer::from_env()?;
        let project_root = local
            .config
            .parent()
            .unwrap_or_else(|| Path::new("."))
            .to_path_buf();
        let prompt_catalog = Arc::new(Mutex::new(FilePromptCatalog::discover(
            &project_root,
            &manifest_source,
        )));
        let manifest_source = Arc::new(Mutex::new(manifest_source));
        let (list_notify_tx, _) = broadcast::channel(64);
        let (resource_notify_tx, _) = broadcast::channel(128);
        let (task_notify_tx, _) = broadcast::channel(64);
        let (log_notify_tx, _) = broadcast::channel(LOG_NOTIFICATION_CAPACITY);
        let list_watcher = start_list_change_watcher(
            project_root,
            local.config.clone(),
            manifest_source.clone(),
            prompt_catalog.clone(),
            list_notify_tx.clone(),
        );
        let log_watchers_ready = Arc::new(LogWatcherReadiness::default());
        let (log_event_log, log_watchers) = spawn_log_topic_watchers(
            &local.state_dir,
            log_notify_tx.clone(),
            log_watchers_ready.clone(),
        );
        Ok(Self {
            config_path: local.config,
            state_dir: local.state_dir,
            manifest_source,
            auth,
            oauth,
            prompt_catalog,
            list_notify_tx,
            resource_notify_tx,
            task_notify_tx,
            log_notify_tx,
            log_event_log,
            log_watchers_ready,
            tasks: Arc::new(Mutex::new(BTreeMap::new())),
            resource_watchers: Arc::new(Mutex::new(BTreeMap::new())),
            _list_watcher: Arc::new(Mutex::new(list_watcher)),
            _log_watchers: Arc::new(AbortOnDrop(log_watchers)),
        })
    }

    pub(super) fn local_args(&self) -> OrchestratorLocalArgs {
        OrchestratorLocalArgs {
            config: self.config_path.clone(),
            state_dir: self.state_dir.clone(),
        }
    }

    pub(crate) fn notify_manifest_reloaded(&self) {
        if let Ok(manifest_source) = std::fs::read_to_string(&self.config_path) {
            self.refresh_manifest_derived_state(manifest_source);
        }
        self.notify_list_changed(&[
            McpListChangeKind::Tools,
            McpListChangeKind::Resources,
            McpListChangeKind::Prompts,
        ]);
    }

    pub(super) fn refresh_manifest_derived_state(&self, manifest_source: String) {
        *self
            .manifest_source
            .lock()
            .expect("manifest source poisoned") = manifest_source.clone();
        let project_root = self
            .config_path
            .parent()
            .unwrap_or_else(|| Path::new("."))
            .to_path_buf();
        let updated = FilePromptCatalog::discover(&project_root, &manifest_source);
        *self.prompt_catalog.lock().expect("prompt catalog poisoned") = updated;
    }

    pub(super) fn notify_list_changed(&self, kinds: &[McpListChangeKind]) {
        for kind in kinds {
            let _ = self.list_notify_tx.send(kind.notification());
        }
    }

    pub(super) fn subscribe_list_notifications(&self) -> broadcast::Receiver<JsonValue> {
        self.list_notify_tx.subscribe()
    }

    pub(super) fn subscribe_resource_notifications(
        &self,
    ) -> broadcast::Receiver<McpResourceNotification> {
        self.resource_notify_tx.subscribe()
    }

    pub(super) fn subscribe_task_notifications(&self) -> broadcast::Receiver<McpTaskNotification> {
        self.task_notify_tx.subscribe()
    }

    pub(super) fn subscribe_log_notifications(&self) -> broadcast::Receiver<McpLogNotification> {
        self.log_notify_tx.subscribe()
    }

    #[cfg(test)]
    pub(super) async fn wait_for_log_watchers_ready(&self) {
        use std::sync::atomic::Ordering;
        loop {
            let notified = self.log_watchers_ready.notify.notified();
            tokio::pin!(notified);
            notified.as_mut().enable();
            let expected = self.log_watchers_ready.expected.load(Ordering::SeqCst);
            if expected > 0 && self.log_watchers_ready.ready.load(Ordering::SeqCst) >= expected {
                return;
            }
            notified.await;
        }
    }
}