Skip to main content

harn_cli/commands/orchestrator/
harness.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::net::SocketAddr;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use harn_vm::clock::{Clock, RealClock};
10use serde::{Deserialize, Serialize};
11use serde_json::{json, Value as JsonValue};
12use time::format_description::well_known::Rfc3339;
13use time::OffsetDateTime;
14use tokio::sync::{mpsc, oneshot, watch};
15use tokio::task::JoinSet;
16
17use harn_vm::event_log::{AnyEventLog, ConsumerId, EventLog};
18
19use super::common::stranded_envelopes;
20use super::errors::OrchestratorError;
21use super::listener::{
22    AdminReloadHandle, AdminReloadRequest, ListenerConfig, ListenerRuntime, RouteConfig,
23    TriggerMetricSnapshot,
24};
25use super::origin_guard::OriginAllowList;
26use super::role::OrchestratorRole;
27use super::supervisor_state::apply_supervisor_state;
28use super::tls::TlsFiles;
29use crate::package::{
30    self, CollectedManifestTrigger, CollectedTriggerHandler, Manifest,
31    ResolvedProviderConnectorConfig, ResolvedProviderConnectorKind, ResolvedTriggerConfig,
32};
33
34const LIFECYCLE_TOPIC: &str = "orchestrator.lifecycle";
35#[cfg_attr(not(unix), allow(dead_code))]
36const MANIFEST_TOPIC: &str = "orchestrator.manifest";
37const STATE_SNAPSHOT_FILE: &str = "orchestrator-state.json";
38const PENDING_TOPIC: &str = "orchestrator.triggers.pending";
39const CRON_TICK_TOPIC: &str = "connectors.cron.tick";
40const TEST_INBOX_TASK_RELEASE_FILE_ENV: &str = "HARN_TEST_ORCHESTRATOR_INBOX_TASK_RELEASE_FILE";
41const TEST_FAIL_PENDING_PUMP_ENV: &str = "HARN_TEST_ORCHESTRATOR_FAIL_PENDING_PUMP";
42const WAITPOINT_SERVICE_INTERVAL: Duration = Duration::from_millis(250);
43
44// ── Public config ─────────────────────────────────────────────────────────────
45
46/// Drain limits applied during graceful shutdown.
47#[derive(Clone, Copy, Debug, PartialEq, Eq)]
48pub struct DrainConfig {
49    pub max_items: usize,
50    pub deadline: Duration,
51}
52
53impl Default for DrainConfig {
54    fn default() -> Self {
55        Self {
56            max_items: crate::package::default_orchestrator_drain_max_items(),
57            deadline: Duration::from_secs(
58                crate::package::default_orchestrator_drain_deadline_seconds(),
59            ),
60        }
61    }
62}
63
64/// Topic-pump concurrency limit.
65#[derive(Clone, Copy, Debug, PartialEq, Eq)]
66pub struct PumpConfig {
67    pub max_outstanding: usize,
68}
69
70impl Default for PumpConfig {
71    fn default() -> Self {
72        Self {
73            max_outstanding: crate::package::default_orchestrator_pump_max_outstanding(),
74        }
75    }
76}
77
78/// Configuration for `OrchestratorHarness::start`.
79#[derive(Clone, Debug)]
80pub struct OrchestratorConfig {
81    pub manifest_path: PathBuf,
82    pub state_dir: PathBuf,
83    pub bind: SocketAddr,
84    pub role: OrchestratorRole,
85    pub watch_manifest: bool,
86    pub mcp: bool,
87    pub mcp_path: String,
88    pub mcp_sse_path: String,
89    pub mcp_messages_path: String,
90    pub tls: Option<TlsFiles>,
91    pub shutdown_timeout: Duration,
92    pub drain: DrainConfig,
93    pub pump: PumpConfig,
94    /// When `Some`, installs an observability tracing subscriber.  Tests
95    /// should leave this `None` to avoid conflicts with the test runtime.
96    pub log_format: Option<harn_vm::observability::otel::LogFormat>,
97    /// Time + sleep substrate. Defaults to [`RealClock`]; tests inject
98    /// [`harn_vm::clock::PausedClock`] via [`OrchestratorConfig::with_clock`].
99    pub clock: Arc<dyn Clock>,
100}
101
102impl OrchestratorConfig {
103    /// Minimal defaults suitable for integration tests.
104    pub fn for_test(manifest_path: PathBuf, state_dir: PathBuf) -> Self {
105        Self {
106            manifest_path,
107            state_dir,
108            bind: "127.0.0.1:0".parse().unwrap(),
109            role: OrchestratorRole::SingleTenant,
110            watch_manifest: false,
111            mcp: false,
112            mcp_path: "/mcp".to_string(),
113            mcp_sse_path: "/sse".to_string(),
114            mcp_messages_path: "/messages".to_string(),
115            tls: None,
116            shutdown_timeout: Duration::from_secs(5),
117            drain: DrainConfig::default(),
118            pump: PumpConfig::default(),
119            log_format: None,
120            clock: RealClock::arc(),
121        }
122    }
123
124    /// Inject a custom [`Clock`]. Production callers leave the default
125    /// [`RealClock`]; harness-driven tests pass a
126    /// [`harn_vm::clock::PausedClock`] so cron and the dispatcher run on
127    /// virtual time.
128    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
129        self.clock = clock;
130        self
131    }
132}
133
134// ── Public harness ────────────────────────────────────────────────────────────
135
136/// Summary returned by `OrchestratorHarness::shutdown`.
137#[derive(Debug)]
138pub struct ShutdownReport {
139    #[allow(dead_code)]
140    pub timed_out: bool,
141}
142
143/// Error type for harness operations.
144#[derive(Debug)]
145pub struct HarnessError(pub String);
146
147impl std::fmt::Display for HarnessError {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        f.write_str(&self.0)
150    }
151}
152
153impl From<OrchestratorError> for HarnessError {
154    fn from(error: OrchestratorError) -> Self {
155        HarnessError(error.to_string())
156    }
157}
158
159impl From<String> for HarnessError {
160    fn from(s: String) -> Self {
161        HarnessError(s)
162    }
163}
164
165/// In-process orchestrator runtime.
166///
167/// Starts all pumps, connectors, and the HTTP listener in a dedicated
168/// background thread with its own single-threaded Tokio runtime + LocalSet.
169/// Tests hold `Arc<AnyEventLog>` directly and use `EventLog::subscribe()` for
170/// event-driven waits instead of polling.
171#[allow(dead_code)]
172pub struct OrchestratorHarness {
173    event_log: Arc<AnyEventLog>,
174    listener_url: String,
175    local_addr: SocketAddr,
176    state_dir: PathBuf,
177    admin_reload: AdminReloadHandle,
178    shutdown_tx: Arc<watch::Sender<bool>>,
179    pump_drain_gate: PumpDrainGate,
180    join: Option<std::thread::JoinHandle<()>>,
181}
182
183struct ReadyState {
184    event_log: Arc<AnyEventLog>,
185    listener_url: String,
186    local_addr: SocketAddr,
187    state_dir: PathBuf,
188    admin_reload: AdminReloadHandle,
189}
190
191#[derive(Clone)]
192struct PumpDrainGate {
193    hold_tx: watch::Sender<bool>,
194}
195
196impl PumpDrainGate {
197    fn new() -> Self {
198        let (hold_tx, _) = watch::channel(false);
199        Self { hold_tx }
200    }
201
202    fn pause(&self) {
203        let _ = self.hold_tx.send(true);
204    }
205
206    fn release(&self) {
207        let _ = self.hold_tx.send(false);
208    }
209
210    fn subscribe(&self) -> watch::Receiver<bool> {
211        self.hold_tx.subscribe()
212    }
213}
214
215#[allow(dead_code)]
216impl OrchestratorHarness {
217    /// Start the orchestrator in-process.  Resolves once the HTTP listener
218    /// is ready and the startup lifecycle event has been appended.
219    pub async fn start(config: OrchestratorConfig) -> Result<Self, HarnessError> {
220        let (ready_tx, ready_rx) = oneshot::channel::<Result<ReadyState, OrchestratorError>>();
221        let (shutdown_tx, shutdown_rx) = watch::channel(false);
222        let shutdown_tx = Arc::new(shutdown_tx);
223        let pump_drain_gate = PumpDrainGate::new();
224        let task_pump_drain_gate = pump_drain_gate.clone();
225
226        let join = std::thread::spawn(move || {
227            // Use a multi-thread runtime so that blocking I/O (e.g. the OTEL
228            // SimpleSpanProcessor calling futures::executor::block_on for each
229            // span) can proceed on reactor threads while the LocalSet thread is
230            // temporarily paused.  A current-thread runtime would deadlock:
231            // block_on holds the only thread while the reactor needs that same
232            // thread to drive the TCP export.
233            let rt = tokio::runtime::Builder::new_multi_thread()
234                .worker_threads(2)
235                .enable_all()
236                .build()
237                .expect("failed to build OrchestratorHarness tokio runtime");
238            let local = tokio::task::LocalSet::new();
239            rt.block_on(local.run_until(orchestrator_task(
240                config,
241                ready_tx,
242                shutdown_rx,
243                task_pump_drain_gate,
244            )));
245        });
246
247        match ready_rx.await {
248            Ok(Ok(ready)) => Ok(Self {
249                event_log: ready.event_log,
250                listener_url: ready.listener_url,
251                local_addr: ready.local_addr,
252                state_dir: ready.state_dir,
253                admin_reload: ready.admin_reload,
254                shutdown_tx,
255                pump_drain_gate,
256                join: Some(join),
257            }),
258            Ok(Err(error)) => {
259                let _ = join.join();
260                Err(HarnessError::from(error))
261            }
262            Err(_) => {
263                let _ = join.join();
264                Err(HarnessError(
265                    "harness thread exited before signaling readiness".to_string(),
266                ))
267            }
268        }
269    }
270
271    pub fn listener_url(&self) -> &str {
272        &self.listener_url
273    }
274
275    pub fn local_addr(&self) -> SocketAddr {
276        self.local_addr
277    }
278
279    pub fn event_log(&self) -> Arc<AnyEventLog> {
280        self.event_log.clone()
281    }
282
283    pub fn state_dir(&self) -> &Path {
284        &self.state_dir
285    }
286
287    /// Returns a handle that can be used to trigger admin-reload from outside
288    /// the harness (e.g. on SIGHUP in the CLI wrapper).
289    pub fn admin_reload(&self) -> AdminReloadHandle {
290        self.admin_reload.clone()
291    }
292
293    /// Returns a sender that, when `send(true)` is called, triggers graceful
294    /// shutdown.  Use this to wire OS signal handlers in the CLI wrapper.
295    pub fn shutdown_trigger(&self) -> Arc<watch::Sender<bool>> {
296        self.shutdown_tx.clone()
297    }
298
299    /// Pause topic pumps at the next event admission. Tests can wait for
300    /// `pump_drain_waiting` before triggering shutdown.
301    pub fn pause_pump_drain(&self) {
302        self.pump_drain_gate.pause();
303    }
304
305    /// Release topic pumps paused by `pause_pump_drain`.
306    pub fn release_pump_drain(&self) {
307        self.pump_drain_gate.release();
308    }
309
310    /// Cancel-safe, idempotent.  Performs the same drain logic as SIGTERM.
311    pub async fn shutdown(mut self, _deadline: Duration) -> Result<ShutdownReport, HarnessError> {
312        // Signal the background runtime to start graceful shutdown.
313        let _ = self.shutdown_tx.send(true);
314        // Wait for the background thread to finish (graceful shutdown runs there).
315        let join = self.join.take().expect("join handle");
316        tokio::task::spawn_blocking(move || join.join())
317            .await
318            .map_err(|_| HarnessError("spawn_blocking join failed".to_string()))?
319            .map_err(|_| HarnessError("harness background thread panicked".to_string()))?;
320        Ok(ShutdownReport { timed_out: false })
321    }
322}
323
324impl Drop for OrchestratorHarness {
325    fn drop(&mut self) {
326        let _ = self.shutdown_tx.send(true);
327        if let Some(join) = self.join.take() {
328            let _ = join.join();
329        }
330    }
331}
332
333// ── Internal lifecycle ────────────────────────────────────────────────────────
334
335async fn orchestrator_task(
336    config: OrchestratorConfig,
337    ready_tx: oneshot::Sender<Result<ReadyState, OrchestratorError>>,
338    shutdown_rx: watch::Receiver<bool>,
339    pump_drain_gate: PumpDrainGate,
340) {
341    if let Err(error) = orchestrator_lifecycle(config, ready_tx, shutdown_rx, pump_drain_gate).await
342    {
343        eprintln!("[harn] orchestrator harness error: {error}");
344    }
345}
346
347async fn orchestrator_lifecycle(
348    config: OrchestratorConfig,
349    ready_tx: oneshot::Sender<Result<ReadyState, OrchestratorError>>,
350    mut shutdown_rx: watch::Receiver<bool>,
351    pump_drain_gate: PumpDrainGate,
352) -> Result<(), OrchestratorError> {
353    harn_vm::reset_thread_local_state();
354
355    let shutdown_timeout = config.shutdown_timeout;
356    let drain_config = config.drain;
357    let pump_config = PumpConfig {
358        max_outstanding: config.pump.max_outstanding.max(1),
359    };
360
361    let state_dir = config.state_dir.clone();
362    std::fs::create_dir_all(&state_dir).map_err(|error| {
363        format!(
364            "failed to create state dir {}: {error}",
365            state_dir.display()
366        )
367    })?;
368
369    let observability = if let Some(log_format) = config.log_format {
370        Some(
371            harn_vm::observability::otel::ObservabilityGuard::install_orchestrator_subscriber(
372                harn_vm::observability::otel::OrchestratorObservabilityConfig {
373                    log_format,
374                    state_dir: Some(state_dir.clone()),
375                },
376            )?,
377        )
378    } else {
379        None
380    };
381
382    let config_path = absolutize_from_cwd(&config.manifest_path)?;
383    let (manifest, manifest_dir) = load_manifest(&config_path)?;
384    let drain_config = DrainConfig {
385        max_items: drain_config.max_items.max(1),
386        deadline: drain_config.deadline,
387    };
388    let pump_config = PumpConfig {
389        max_outstanding: pump_config.max_outstanding.max(1),
390    };
391
392    let startup_started_at = now_rfc3339()?;
393    let (admin_reload, mut reload_rx) = AdminReloadHandle::channel();
394
395    eprintln!("[harn] orchestrator manifest: {}", config_path.display());
396    if let Some(name) = manifest
397        .package
398        .as_ref()
399        .and_then(|package| package.name.as_deref())
400    {
401        eprintln!("[harn] orchestrator package: {name}");
402    }
403    eprintln!(
404        "[harn] orchestrator role: {} ({})",
405        config.role.as_str(),
406        config.role.registry_mode()
407    );
408    eprintln!("[harn] orchestrator state dir: {}", state_dir.display());
409    tracing::info!(
410        component = "orchestrator",
411        trace_id = "",
412        role = config.role.as_str(),
413        state_dir = %state_dir.display(),
414        manifest = %config_path.display(),
415        "orchestrator starting"
416    );
417
418    let workspace_root = manifest_dir.clone();
419    let mut vm = config
420        .role
421        .build_vm(&workspace_root, &manifest_dir, &state_dir)?;
422
423    let event_log = harn_vm::event_log::active_event_log()
424        .ok_or_else(|| "event log was not installed during VM initialization".to_string())?;
425    let event_log_description = event_log.describe();
426    let tenant_store = if config.role == OrchestratorRole::MultiTenant {
427        let store = harn_vm::TenantStore::load(&state_dir)?;
428        let active_tenants = store
429            .list()
430            .into_iter()
431            .filter(|tenant| tenant.status == harn_vm::TenantStatus::Active)
432            .collect::<Vec<_>>();
433        eprintln!(
434            "[harn] tenants loaded: {} active ({})",
435            active_tenants.len(),
436            active_tenants
437                .iter()
438                .map(|tenant| tenant.scope.id.0.as_str())
439                .collect::<Vec<_>>()
440                .join(", ")
441        );
442        Some(Arc::new(store))
443    } else {
444        None
445    };
446    eprintln!(
447        "[harn] event log: {} {}",
448        event_log_description.backend,
449        event_log_description
450            .location
451            .as_ref()
452            .map(|path| path.display().to_string())
453            .unwrap_or_else(|| "<memory>".to_string())
454    );
455
456    let secret_namespace = secret_namespace_for(&manifest_dir);
457    let secret_chain_display = configured_secret_chain_display();
458    let secret_chain = harn_vm::secrets::configured_default_chain(secret_namespace.clone())
459        .map_err(|error| format!("failed to configure secret providers: {error}"))?;
460    if secret_chain.providers().is_empty() {
461        return Err("secret provider chain resolved to zero providers"
462            .to_string()
463            .into());
464    }
465    eprintln!(
466        "[harn] secret providers: {} (namespace {})",
467        secret_chain_display, secret_namespace
468    );
469    let secret_provider: Arc<dyn harn_vm::secrets::SecretProvider> = Arc::new(secret_chain);
470
471    let extensions = package::load_runtime_extensions(&config_path);
472    let metrics_registry = Arc::new(harn_vm::MetricsRegistry::default());
473    harn_vm::install_active_metrics_registry(metrics_registry.clone());
474    let collected_triggers = package::collect_manifest_triggers(&mut vm, &extensions)
475        .await
476        .map_err(|error| format!("failed to collect manifest triggers: {error}"))?;
477    package::install_collected_manifest_triggers(&collected_triggers).await?;
478    apply_supervisor_state(&state_dir).await?;
479    eprintln!(
480        "[harn] registered triggers ({}): {}",
481        collected_triggers.len(),
482        format_trigger_summary(&collected_triggers)
483    );
484
485    let binding_versions = live_manifest_binding_versions();
486    let route_configs = build_route_configs(&collected_triggers, &binding_versions)?;
487    let mut connector_runtime = initialize_connectors(
488        &collected_triggers,
489        event_log.clone(),
490        secret_provider.clone(),
491        metrics_registry.clone(),
492        &extensions.provider_connectors,
493        config.clock.clone(),
494    )
495    .await?;
496    let route_configs = attach_route_connectors(
497        route_configs,
498        &connector_runtime.registry,
499        &extensions.provider_connectors,
500    )?;
501    let connector_clients = connector_runtime.registry.client_map().await;
502    harn_vm::install_active_connector_clients(connector_clients);
503    eprintln!(
504        "[harn] registered connectors ({}): {}",
505        connector_runtime.providers.len(),
506        connector_runtime.providers.join(", ")
507    );
508    eprintln!(
509        "[harn] activated connectors: {}",
510        format_activation_summary(&connector_runtime.activations)
511    );
512    let (mcp_router, mcp_service) = if config.mcp {
513        validate_mcp_paths(
514            &config.mcp_path,
515            &config.mcp_sse_path,
516            &config.mcp_messages_path,
517        )?;
518        if !has_orchestrator_api_keys_configured() && !has_mcp_oauth_configured() {
519            return Err(OrchestratorError::Serve(
520                "--mcp requires HARN_ORCHESTRATOR_API_KEYS or HARN_MCP_OAUTH_AUTHORIZATION_SERVERS so the embedded MCP management surface is authenticated"
521                    .to_string(),
522            ));
523        }
524        let service = Arc::new(
525            crate::commands::mcp::serve::McpOrchestratorService::new_local(
526                crate::cli::OrchestratorLocalArgs {
527                    config: config_path.clone(),
528                    state_dir: state_dir.clone(),
529                },
530            )?,
531        );
532        let router = crate::commands::mcp::serve::http_router_for_service(
533            service.clone(),
534            config.mcp_path.clone(),
535            config.mcp_sse_path.clone(),
536            config.mcp_messages_path.clone(),
537        );
538        eprintln!(
539            "[harn] embedded MCP server mounted at {} (legacy SSE {}, messages {})",
540            config.mcp_path, config.mcp_sse_path, config.mcp_messages_path
541        );
542        (Some(router), Some(service))
543    } else {
544        (None, None)
545    };
546
547    let dispatcher = harn_vm::Dispatcher::with_event_log_and_metrics(
548        vm,
549        event_log.clone(),
550        Some(metrics_registry.clone()),
551    );
552    let mut pending_pumps = vec![(
553        PENDING_TOPIC.to_string(),
554        spawn_pending_pump(
555            event_log.clone(),
556            dispatcher.clone(),
557            pump_config,
558            metrics_registry.clone(),
559            pump_drain_gate.clone(),
560            PENDING_TOPIC,
561        )?,
562    )];
563    let mut inbox_pumps = vec![(
564        harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC.to_string(),
565        spawn_inbox_pump(
566            event_log.clone(),
567            dispatcher.clone(),
568            pump_config,
569            metrics_registry.clone(),
570            harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC,
571        )?,
572    )];
573    if let Some(store) = tenant_store.as_ref() {
574        for tenant in store
575            .list()
576            .into_iter()
577            .filter(|tenant| tenant.status == harn_vm::TenantStatus::Active)
578        {
579            let pending_topic = harn_vm::tenant_topic(
580                &tenant.scope.id,
581                &harn_vm::event_log::Topic::new(PENDING_TOPIC)
582                    .map_err(|error| error.to_string())?,
583            )
584            .map_err(|error| error.to_string())?;
585            pending_pumps.push((
586                pending_topic.as_str().to_string(),
587                spawn_pending_pump(
588                    event_log.clone(),
589                    dispatcher.clone(),
590                    pump_config,
591                    metrics_registry.clone(),
592                    pump_drain_gate.clone(),
593                    pending_topic.as_str(),
594                )?,
595            ));
596            let inbox_topic = harn_vm::tenant_topic(
597                &tenant.scope.id,
598                &harn_vm::event_log::Topic::new(harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC)
599                    .map_err(|error| error.to_string())?,
600            )
601            .map_err(|error| error.to_string())?;
602            inbox_pumps.push((
603                inbox_topic.as_str().to_string(),
604                spawn_inbox_pump(
605                    event_log.clone(),
606                    dispatcher.clone(),
607                    pump_config,
608                    metrics_registry.clone(),
609                    inbox_topic.as_str(),
610                )?,
611            ));
612        }
613    }
614    let cron_pump = spawn_cron_pump(
615        event_log.clone(),
616        dispatcher.clone(),
617        pump_config,
618        metrics_registry.clone(),
619        pump_drain_gate.clone(),
620    )?;
621    let waitpoint_pump = spawn_waitpoint_resume_pump(
622        event_log.clone(),
623        dispatcher.clone(),
624        pump_config,
625        metrics_registry.clone(),
626        pump_drain_gate.clone(),
627    )?;
628    let waitpoint_cancel_pump = spawn_waitpoint_cancel_pump(
629        event_log.clone(),
630        dispatcher.clone(),
631        pump_config,
632        metrics_registry.clone(),
633        pump_drain_gate.clone(),
634    )?;
635    let waitpoint_sweeper = spawn_waitpoint_sweeper(dispatcher.clone());
636
637    let listener = ListenerRuntime::start(ListenerConfig {
638        bind: config.bind,
639        tls: config.tls.clone(),
640        event_log: event_log.clone(),
641        secrets: secret_provider.clone(),
642        allowed_origins: OriginAllowList::from_manifest(&manifest.orchestrator.allowed_origins),
643        max_body_bytes: ListenerConfig::max_body_bytes_or_default(
644            manifest.orchestrator.max_body_bytes,
645        ),
646        metrics_registry: metrics_registry.clone(),
647        admin_reload: Some(admin_reload.clone()),
648        mcp_router,
649        routes: route_configs,
650        tenant_store: tenant_store.clone(),
651        session_store: Some(Arc::new(harn_vm::SessionStore::new(event_log.clone()))),
652    })
653    .await?;
654    let local_bind = listener.local_addr();
655    let listener_metrics = listener.trigger_metrics();
656    let mut live_manifest = manifest;
657    let mut live_triggers = collected_triggers;
658    let _manifest_watcher = if config.watch_manifest {
659        Some(spawn_manifest_watcher(
660            config_path.clone(),
661            admin_reload.clone(),
662        )?)
663    } else {
664        None
665    };
666    connector_runtime.activations = connector_runtime
667        .registry
668        .activate_all(&connector_runtime.trigger_registry)
669        .await
670        .map_err(|error| error.to_string())?;
671    eprintln!(
672        "[harn] activated connectors: {}",
673        format_activation_summary(&connector_runtime.activations)
674    );
675
676    listener.mark_ready();
677    eprintln!("[harn] HTTP listener ready on {}", listener.url());
678    tracing::info!(
679        component = "orchestrator",
680        trace_id = "",
681        listener_url = %listener.url(),
682        "HTTP listener ready"
683    );
684
685    write_state_snapshot(
686        &state_dir.join(STATE_SNAPSHOT_FILE),
687        &ServeStateSnapshot {
688            status: "running".to_string(),
689            role: config.role.as_str().to_string(),
690            bind: local_bind.to_string(),
691            listener_url: listener.url(),
692            manifest_path: config_path.display().to_string(),
693            state_dir: state_dir.display().to_string(),
694            started_at: startup_started_at.clone(),
695            stopped_at: None,
696            secret_provider_chain: secret_chain_display.clone(),
697            event_log_backend: event_log_description.backend.to_string(),
698            event_log_location: event_log_description
699                .location
700                .as_ref()
701                .map(|path| path.display().to_string()),
702            triggers: trigger_state_snapshots(&live_triggers, &listener_metrics),
703            connectors: connector_runtime.providers.clone(),
704            activations: connector_runtime
705                .activations
706                .iter()
707                .map(|activation| ConnectorActivationSnapshot {
708                    provider: activation.provider.as_str().to_string(),
709                    binding_count: activation.binding_count,
710                })
711                .collect(),
712        },
713    )?;
714
715    append_lifecycle_event(
716        &event_log,
717        "startup",
718        json!({
719            "bind": local_bind.to_string(),
720            "manifest": config_path.display().to_string(),
721            "role": config.role.as_str(),
722            "state_dir": state_dir.display().to_string(),
723            "trigger_count": live_triggers.len(),
724            "connector_count": connector_runtime.providers.len(),
725            "tls_enabled": listener.scheme() == "https",
726            "shutdown_timeout_secs": shutdown_timeout.as_secs(),
727            "drain_max_items": drain_config.max_items,
728            "drain_deadline_secs": drain_config.deadline.as_secs(),
729            "pump_max_outstanding": pump_config.max_outstanding,
730        }),
731    )
732    .await?;
733
734    let stranded = stranded_envelopes(&event_log, Duration::ZERO).await?;
735    if !stranded.is_empty() {
736        eprintln!(
737            "[harn] startup found {} stranded inbox envelope(s); inspect with `harn orchestrator queue` and recover explicitly with `harn orchestrator recover --dry-run --envelope-age ...`",
738            stranded.len()
739        );
740    }
741    append_lifecycle_event(
742        &event_log,
743        "startup_stranded_envelopes",
744        json!({
745            "count": stranded.len(),
746        }),
747    )
748    .await?;
749
750    // Signal that the harness is ready.
751    let _ = ready_tx.send(Ok(ReadyState {
752        event_log: event_log.clone(),
753        listener_url: listener.url(),
754        local_addr: local_bind,
755        state_dir: state_dir.clone(),
756        admin_reload: admin_reload.clone(),
757    }));
758
759    // Wait for shutdown trigger or admin reload requests.
760    let mut ctx = RuntimeCtx {
761        role: config.role,
762        config_path: &config_path,
763        state_dir: &state_dir,
764        bind: local_bind,
765        startup_started_at: &startup_started_at,
766        event_log: &event_log,
767        event_log_description: &event_log_description,
768        secret_chain_display: &secret_chain_display,
769        listener: &listener,
770        connectors: &mut connector_runtime,
771        live_manifest: &mut live_manifest,
772        live_triggers: &mut live_triggers,
773        secret_provider: &secret_provider,
774        metrics_registry: &metrics_registry,
775        mcp_service: mcp_service.as_ref(),
776        clock: config.clock.clone(),
777        reload_rx: &mut reload_rx,
778    };
779
780    loop {
781        tokio::select! {
782            changed = shutdown_rx.changed() => {
783                if changed.is_err() || *shutdown_rx.borrow() {
784                    break;
785                }
786            }
787            Some(request) = ctx.reload_rx.recv() => {
788                handle_reload_request(&mut ctx, request).await?;
789            }
790        }
791    }
792
793    listener.mark_not_ready();
794    let shutdown = graceful_shutdown(
795        GracefulShutdownCtx {
796            role: config.role,
797            bind: local_bind,
798            listener_url: listener.url(),
799            config_path: &config_path,
800            state_dir: &state_dir,
801            startup_started_at: &startup_started_at,
802            event_log: &event_log,
803            event_log_description: &event_log_description,
804            secret_chain_display: &secret_chain_display,
805            triggers: &live_triggers,
806            connectors: &connector_runtime,
807            shutdown_timeout,
808            drain_config,
809        },
810        listener,
811        dispatcher,
812        pending_pumps,
813        cron_pump,
814        inbox_pumps,
815        waitpoint_pump,
816        waitpoint_cancel_pump,
817        waitpoint_sweeper,
818    )
819    .await;
820
821    if let Some(obs) = observability {
822        if let Err(error) = obs.shutdown() {
823            if shutdown.is_ok() {
824                return Err(OrchestratorError::Serve(error));
825            }
826            eprintln!("[harn] observability shutdown warning: {error}");
827        }
828    }
829    harn_vm::clear_active_metrics_registry();
830    shutdown
831}
832
833// ── Internal types ────────────────────────────────────────────────────────────
834
835struct ConnectorRuntime {
836    registry: harn_vm::ConnectorRegistry,
837    trigger_registry: harn_vm::TriggerRegistry,
838    handles: Vec<harn_vm::connectors::ConnectorHandle>,
839    providers: Vec<String>,
840    activations: Vec<harn_vm::ActivationHandle>,
841    #[cfg_attr(not(unix), allow(dead_code))]
842    provider_overrides: Vec<ResolvedProviderConnectorConfig>,
843}
844
845#[cfg_attr(not(unix), allow(dead_code))]
846#[derive(Clone, Debug, Default, Serialize)]
847struct ManifestReloadSummary {
848    added: Vec<String>,
849    modified: Vec<String>,
850    removed: Vec<String>,
851    unchanged: Vec<String>,
852}
853
854#[derive(Clone, Copy, Debug, PartialEq, Eq)]
855enum PumpMode {
856    Running,
857    Draining(PumpDrainRequest),
858}
859
860#[derive(Clone, Copy, Debug, PartialEq, Eq)]
861struct PumpDrainRequest {
862    up_to: u64,
863    config: DrainConfig,
864    deadline: tokio::time::Instant,
865}
866
867#[derive(Clone, Copy, Debug, PartialEq, Eq)]
868enum PumpDrainStopReason {
869    Drained,
870    MaxItems,
871    Deadline,
872    Error,
873}
874
875impl PumpDrainStopReason {
876    fn as_str(self) -> &'static str {
877        match self {
878            Self::Drained => "drained",
879            Self::MaxItems => "max_items",
880            Self::Deadline => "deadline",
881            Self::Error => "error",
882        }
883    }
884}
885
886#[derive(Clone, Copy, Debug, Default)]
887struct PumpStats {
888    last_seen: u64,
889    processed: u64,
890}
891
892#[derive(Clone, Copy, Debug)]
893struct PumpDrainProgress {
894    request: PumpDrainRequest,
895    start_seen: u64,
896}
897
898#[derive(Clone, Copy, Debug)]
899struct PumpDrainReport {
900    stats: PumpStats,
901    drain_items: u64,
902    remaining_queued: u64,
903    outstanding_tasks: usize,
904    stop_reason: PumpDrainStopReason,
905}
906
907impl PumpDrainReport {
908    fn truncated(self) -> bool {
909        self.remaining_queued > 0 || self.outstanding_tasks > 0
910    }
911}
912
913struct PumpHandle {
914    mode_tx: watch::Sender<PumpMode>,
915    join: tokio::task::JoinHandle<Result<PumpDrainReport, OrchestratorError>>,
916}
917
918impl PumpHandle {
919    async fn drain(
920        self,
921        log: &Arc<AnyEventLog>,
922        topic_name: &str,
923        up_to: u64,
924        config: DrainConfig,
925        overall_deadline: tokio::time::Instant,
926    ) -> Result<PumpDrainReport, OrchestratorError> {
927        let drain_deadline = std::cmp::min(
928            tokio::time::Instant::now() + config.deadline,
929            overall_deadline,
930        );
931        let _ = self.mode_tx.send(PumpMode::Draining(PumpDrainRequest {
932            up_to,
933            config,
934            deadline: drain_deadline,
935        }));
936        append_pump_lifecycle_event(
937            log,
938            "pump_drain_started",
939            json!({
940                "topic": topic_name,
941                "up_to": up_to,
942                "max_items": config.max_items,
943                "drain_deadline_ms": config.deadline.as_millis(),
944            }),
945        )
946        .await?;
947        match self.join.await {
948            Ok(result) => result,
949            Err(error) => Err(format!("pump task join failed: {error}").into()),
950        }
951    }
952}
953
954struct WaitpointSweepHandle {
955    stop_tx: watch::Sender<bool>,
956    join: tokio::task::JoinHandle<Result<(), OrchestratorError>>,
957}
958
959impl WaitpointSweepHandle {
960    async fn shutdown(self) -> Result<(), OrchestratorError> {
961        let _ = self.stop_tx.send(true);
962        match self.join.await {
963            Ok(result) => result,
964            Err(error) => Err(format!("waitpoint sweeper join failed: {error}").into()),
965        }
966    }
967}
968
969#[derive(Debug, Deserialize)]
970struct PendingTriggerRecord {
971    trigger_id: String,
972    binding_version: u32,
973    event: harn_vm::TriggerEvent,
974}
975
976// ── Pump spawn functions ──────────────────────────────────────────────────────
977
978fn spawn_pending_pump(
979    event_log: Arc<harn_vm::event_log::AnyEventLog>,
980    dispatcher: harn_vm::Dispatcher,
981    pump_config: PumpConfig,
982    metrics_registry: Arc<harn_vm::MetricsRegistry>,
983    pump_drain_gate: PumpDrainGate,
984    topic_name: &str,
985) -> Result<PumpHandle, OrchestratorError> {
986    let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
987    spawn_topic_pump(
988        event_log,
989        topic,
990        pump_config,
991        metrics_registry,
992        pump_drain_gate,
993        move |logged| {
994            let dispatcher = dispatcher.clone();
995            async move {
996                if pending_pump_test_should_fail() {
997                    return Err("test pending pump failure".to_string().into());
998                }
999                if logged.kind != "trigger_event" {
1000                    return Ok(false);
1001                }
1002                let record: PendingTriggerRecord = serde_json::from_value(logged.payload)
1003                    .map_err(|error| format!("failed to decode pending trigger event: {error}"))?;
1004                dispatcher
1005                    .enqueue_targeted_with_headers(
1006                        Some(record.trigger_id),
1007                        Some(record.binding_version),
1008                        record.event,
1009                        Some(&logged.headers),
1010                    )
1011                    .await
1012                    .map_err(|error| format!("failed to enqueue pending trigger event: {error}"))?;
1013                Ok(true)
1014            }
1015        },
1016    )
1017}
1018
1019fn spawn_cron_pump(
1020    event_log: Arc<harn_vm::event_log::AnyEventLog>,
1021    dispatcher: harn_vm::Dispatcher,
1022    pump_config: PumpConfig,
1023    metrics_registry: Arc<harn_vm::MetricsRegistry>,
1024    pump_drain_gate: PumpDrainGate,
1025) -> Result<PumpHandle, OrchestratorError> {
1026    let topic =
1027        harn_vm::event_log::Topic::new(CRON_TICK_TOPIC).map_err(|error| error.to_string())?;
1028    spawn_topic_pump(
1029        event_log,
1030        topic,
1031        pump_config,
1032        metrics_registry,
1033        pump_drain_gate,
1034        move |logged| {
1035            let dispatcher = dispatcher.clone();
1036            async move {
1037                if logged.kind != "trigger_event" {
1038                    return Ok(false);
1039                }
1040                let event: harn_vm::TriggerEvent = serde_json::from_value(logged.payload)
1041                    .map_err(|error| format!("failed to decode cron trigger event: {error}"))?;
1042                let trigger_id = match &event.provider_payload {
1043                    harn_vm::ProviderPayload::Known(
1044                        harn_vm::triggers::event::KnownProviderPayload::Cron(payload),
1045                    ) => payload.cron_id.clone(),
1046                    _ => None,
1047                };
1048                dispatcher
1049                    .enqueue_targeted_with_headers(trigger_id, None, event, Some(&logged.headers))
1050                    .await
1051                    .map_err(|error| format!("failed to enqueue cron trigger event: {error}"))?;
1052                Ok(true)
1053            }
1054        },
1055    )
1056}
1057
1058fn spawn_inbox_pump(
1059    event_log: Arc<harn_vm::event_log::AnyEventLog>,
1060    dispatcher: harn_vm::Dispatcher,
1061    pump_config: PumpConfig,
1062    metrics_registry: Arc<harn_vm::MetricsRegistry>,
1063    topic_name: &str,
1064) -> Result<PumpHandle, OrchestratorError> {
1065    let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
1066    let consumer = pump_consumer_id(&topic)?;
1067    let inbox_task_release_file = inbox_task_test_release_file();
1068    let (mode_tx, mut mode_rx) = watch::channel(PumpMode::Running);
1069    let join = tokio::task::spawn_local(async move {
1070        let start_from = event_log
1071            .consumer_cursor(&topic, &consumer)
1072            .await
1073            .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
1074            .or(event_log
1075                .latest(&topic)
1076                .await
1077                .map_err(|error| format!("failed to read topic head {topic}: {error}"))?);
1078        let mut stream = event_log
1079            .clone()
1080            .subscribe(&topic, start_from)
1081            .await
1082            .map_err(|error| format!("failed to subscribe topic {topic}: {error}"))?;
1083        let mut stats = PumpStats {
1084            last_seen: start_from.unwrap_or(0),
1085            processed: 0,
1086        };
1087        record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1088        let mut drain_progress = None;
1089        let mut tasks = JoinSet::new();
1090
1091        loop {
1092            if let Some(progress) = drain_progress {
1093                if let Some(report) = maybe_finish_pump_drain(stats, progress, tasks.len()) {
1094                    return Ok(report);
1095                }
1096            }
1097
1098            let deadline = drain_progress.map(|progress| progress.request.deadline);
1099            let mut deadline_wait = Box::pin(async move {
1100                if let Some(deadline) = deadline {
1101                    tokio::time::sleep_until(deadline).await;
1102                } else {
1103                    std::future::pending::<()>().await;
1104                }
1105            });
1106
1107            tokio::select! {
1108                changed = mode_rx.changed() => {
1109                    if changed.is_err() {
1110                        break;
1111                    }
1112                    if let PumpMode::Draining(request) = *mode_rx.borrow() {
1113                        drain_progress.get_or_insert(PumpDrainProgress {
1114                            request,
1115                            start_seen: stats.last_seen,
1116                        });
1117                    }
1118                }
1119                _ = &mut deadline_wait => {
1120                    if let Some(progress) = drain_progress {
1121                        return Ok(pump_drain_report(
1122                            stats,
1123                            progress.start_seen,
1124                            progress.request.up_to,
1125                            tasks.len(),
1126                            PumpDrainStopReason::Deadline,
1127                        ));
1128                    }
1129                }
1130                joined = tasks.join_next(), if !tasks.is_empty() => {
1131                    match joined {
1132                        Some(Ok(())) => {
1133                            record_pump_metrics(
1134                                &metrics_registry,
1135                                &event_log,
1136                                &topic,
1137                                stats.last_seen,
1138                                tasks.len(),
1139                            )
1140                            .await?;
1141                        }
1142                        Some(Err(error)) => {
1143                            return Err(format!("inbox dispatch task join failed: {error}").into());
1144                        }
1145                        None => {}
1146                    }
1147                }
1148                _ = tokio::time::sleep(Duration::from_millis(25)), if tasks.len() >= pump_config.max_outstanding => {
1149                    record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1150                }
1151                received = stream.next(), if tasks.len() < pump_config.max_outstanding => {
1152                    let Some(received) = received else {
1153                        break;
1154                    };
1155                    let (event_id, logged) = received
1156                        .map_err(|error| format!("topic pump read failed for {topic}: {error}"))?;
1157                    if logged.kind != "event_ingested" {
1158                        stats.last_seen = event_id;
1159                        event_log
1160                            .ack(&topic, &consumer, event_id)
1161                            .await
1162                            .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1163                        record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1164                        continue;
1165                    }
1166                    append_pump_lifecycle_event(
1167                        &event_log,
1168                        "pump_received",
1169                        json!({
1170                            "topic": topic.as_str(),
1171                            "event_log_id": event_id,
1172                            "outstanding": tasks.len(),
1173                            "max_outstanding": pump_config.max_outstanding,
1174                        }),
1175                    )
1176                    .await?;
1177                    let envelope: harn_vm::triggers::dispatcher::InboxEnvelope =
1178                        serde_json::from_value(logged.payload)
1179                            .map_err(|error| format!("failed to decode dispatcher inbox event: {error}"))?;
1180                    let trigger_id = envelope.trigger_id.clone();
1181                    let binding_version = envelope.binding_version;
1182                    let trigger_event_id = envelope.event.id.0.clone();
1183                    let parent_headers = logged.headers.clone();
1184                    append_pump_lifecycle_event(
1185                        &event_log,
1186                        "pump_eligible",
1187                        json!({
1188                            "topic": topic.as_str(),
1189                            "event_log_id": event_id,
1190                            "trigger_id": trigger_id.clone(),
1191                            "binding_version": binding_version,
1192                            "trigger_event_id": trigger_event_id,
1193                        }),
1194                    )
1195                    .await?;
1196                    metrics_registry.record_orchestrator_pump_admission_delay(
1197                        topic.as_str(),
1198                        admission_delay(logged.occurred_at_ms),
1199                    );
1200                    append_pump_lifecycle_event(
1201                        &event_log,
1202                        "pump_admitted",
1203                        json!({
1204                            "topic": topic.as_str(),
1205                            "event_log_id": event_id,
1206                            "outstanding_after_admit": tasks.len() + 1,
1207                            "max_outstanding": pump_config.max_outstanding,
1208                            "trigger_id": trigger_id.clone(),
1209                            "binding_version": binding_version,
1210                            "trigger_event_id": trigger_event_id,
1211                        }),
1212                    )
1213                    .await?;
1214                    let dispatcher = dispatcher.clone();
1215                    let task_event_log = event_log.clone();
1216                    let task_topic = topic.as_str().to_string();
1217                    let inbox_task_release_file = inbox_task_release_file.clone();
1218                    tasks.spawn_local(async move {
1219                        if let Some(path) = inbox_task_release_file.as_ref() {
1220                            wait_for_test_release_file(path).await;
1221                        }
1222                        let _ = append_pump_lifecycle_event(
1223                            &task_event_log,
1224                            "pump_dispatch_started",
1225                            json!({
1226                                "topic": task_topic.clone(),
1227                                "event_log_id": event_id,
1228                                "trigger_id": trigger_id,
1229                                "binding_version": binding_version,
1230                                "trigger_event_id": trigger_event_id,
1231                            }),
1232                        )
1233                        .await;
1234                        let result = dispatcher
1235                            .dispatch_inbox_envelope_with_parent_headers(
1236                                envelope,
1237                                &parent_headers,
1238                            )
1239                            .await;
1240                        let (status, error_message) = match result {
1241                            Ok(_) => ("completed", None),
1242                            Err(error) => {
1243                                let message = error.to_string();
1244                                eprintln!("[harn] inbox dispatch warning: {message}");
1245                                ("failed", Some(message))
1246                            }
1247                        };
1248                        let _ = append_pump_lifecycle_event(
1249                            &task_event_log,
1250                            "pump_dispatch_completed",
1251                            json!({
1252                                "topic": task_topic,
1253                                "event_log_id": event_id,
1254                                "status": status,
1255                                "error": error_message,
1256                            }),
1257                        )
1258                        .await;
1259                    });
1260                    stats.last_seen = event_id;
1261                    stats.processed += 1;
1262                    event_log
1263                        .ack(&topic, &consumer, event_id)
1264                        .await
1265                        .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1266                    append_pump_lifecycle_event(
1267                        &event_log,
1268                        "pump_acked",
1269                        json!({
1270                            "topic": topic.as_str(),
1271                            "event_log_id": event_id,
1272                            "cursor": event_id,
1273                        }),
1274                    )
1275                    .await?;
1276                    record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1277                }
1278            }
1279        }
1280
1281        while let Some(joined) = tasks.join_next().await {
1282            joined.map_err(|error| format!("inbox dispatch task join failed: {error}"))?;
1283            record_pump_metrics(
1284                &metrics_registry,
1285                &event_log,
1286                &topic,
1287                stats.last_seen,
1288                tasks.len(),
1289            )
1290            .await?;
1291        }
1292
1293        Ok(drain_progress
1294            .map(|progress| {
1295                pump_drain_report(
1296                    stats,
1297                    progress.start_seen,
1298                    progress.request.up_to,
1299                    0,
1300                    PumpDrainStopReason::Drained,
1301                )
1302            })
1303            .unwrap_or_else(|| PumpDrainReport {
1304                stats,
1305                drain_items: 0,
1306                remaining_queued: 0,
1307                outstanding_tasks: 0,
1308                stop_reason: PumpDrainStopReason::Drained,
1309            }))
1310    });
1311    Ok(PumpHandle { mode_tx, join })
1312}
1313
1314fn spawn_waitpoint_resume_pump(
1315    event_log: Arc<harn_vm::event_log::AnyEventLog>,
1316    dispatcher: harn_vm::Dispatcher,
1317    pump_config: PumpConfig,
1318    metrics_registry: Arc<harn_vm::MetricsRegistry>,
1319    pump_drain_gate: PumpDrainGate,
1320) -> Result<PumpHandle, OrchestratorError> {
1321    let topic = harn_vm::event_log::Topic::new(harn_vm::WAITPOINT_RESUME_TOPIC)
1322        .map_err(|error| error.to_string())?;
1323    spawn_topic_pump(
1324        event_log,
1325        topic,
1326        pump_config,
1327        metrics_registry,
1328        pump_drain_gate,
1329        move |logged| {
1330            let dispatcher = dispatcher.clone();
1331            async move {
1332                harn_vm::process_waitpoint_resume_event(&dispatcher, logged)
1333                    .await
1334                    .map_err(OrchestratorError::from)
1335            }
1336        },
1337    )
1338}
1339
1340fn spawn_waitpoint_cancel_pump(
1341    event_log: Arc<harn_vm::event_log::AnyEventLog>,
1342    dispatcher: harn_vm::Dispatcher,
1343    pump_config: PumpConfig,
1344    metrics_registry: Arc<harn_vm::MetricsRegistry>,
1345    pump_drain_gate: PumpDrainGate,
1346) -> Result<PumpHandle, OrchestratorError> {
1347    let topic = harn_vm::event_log::Topic::new(harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC)
1348        .map_err(|error| error.to_string())?;
1349    spawn_topic_pump(
1350        event_log,
1351        topic,
1352        pump_config,
1353        metrics_registry,
1354        pump_drain_gate,
1355        move |logged| {
1356            let dispatcher = dispatcher.clone();
1357            async move {
1358                if logged.kind != "dispatch_cancel_requested" {
1359                    return Ok(false);
1360                }
1361                harn_vm::service_waitpoints_once(&dispatcher, None)
1362                    .await
1363                    .map_err(|error| {
1364                        OrchestratorError::Serve(format!(
1365                            "failed to service waitpoints on cancel: {error}"
1366                        ))
1367                    })?;
1368                Ok(true)
1369            }
1370        },
1371    )
1372}
1373
1374fn spawn_waitpoint_sweeper(dispatcher: harn_vm::Dispatcher) -> WaitpointSweepHandle {
1375    let (stop_tx, mut stop_rx) = watch::channel(false);
1376    let join = tokio::task::spawn_local(async move {
1377        let mut interval = tokio::time::interval(WAITPOINT_SERVICE_INTERVAL);
1378        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1379        loop {
1380            tokio::select! {
1381                changed = stop_rx.changed() => {
1382                    if changed.is_err() || *stop_rx.borrow() {
1383                        break;
1384                    }
1385                }
1386                _ = interval.tick() => {
1387                    harn_vm::service_waitpoints_once(&dispatcher, None)
1388                        .await
1389                        .map_err(|error| format!("failed to service waitpoints on sweep: {error}"))?;
1390                }
1391            }
1392        }
1393        Ok(())
1394    });
1395    WaitpointSweepHandle { stop_tx, join }
1396}
1397
1398fn spawn_topic_pump<F, Fut>(
1399    event_log: Arc<harn_vm::event_log::AnyEventLog>,
1400    topic: harn_vm::event_log::Topic,
1401    _pump_config: PumpConfig,
1402    metrics_registry: Arc<harn_vm::MetricsRegistry>,
1403    pump_drain_gate: PumpDrainGate,
1404    process: F,
1405) -> Result<PumpHandle, OrchestratorError>
1406where
1407    F: Fn(harn_vm::event_log::LogEvent) -> Fut + 'static,
1408    Fut: std::future::Future<Output = Result<bool, OrchestratorError>> + 'static,
1409{
1410    let consumer = pump_consumer_id(&topic)?;
1411    let mut pump_drain_gate_rx = pump_drain_gate.subscribe();
1412    let (mode_tx, mut mode_rx) = watch::channel(PumpMode::Running);
1413    let join = tokio::task::spawn_local(async move {
1414        let start_from = event_log
1415            .consumer_cursor(&topic, &consumer)
1416            .await
1417            .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
1418            .or(event_log
1419                .latest(&topic)
1420                .await
1421                .map_err(|error| format!("failed to read topic head {topic}: {error}"))?);
1422        let mut stream = event_log
1423            .clone()
1424            .subscribe(&topic, start_from)
1425            .await
1426            .map_err(|error| format!("failed to subscribe topic {topic}: {error}"))?;
1427        let mut stats = PumpStats {
1428            last_seen: start_from.unwrap_or(0),
1429            processed: 0,
1430        };
1431        record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1432        let mut drain_progress = None;
1433        loop {
1434            if let Some(progress) = drain_progress {
1435                if let Some(report) = maybe_finish_pump_drain(stats, progress, 0) {
1436                    return Ok(report);
1437                }
1438            }
1439            let deadline = drain_progress.map(|progress| progress.request.deadline);
1440            let mut deadline_wait = Box::pin(async move {
1441                if let Some(deadline) = deadline {
1442                    tokio::time::sleep_until(deadline).await;
1443                } else {
1444                    std::future::pending::<()>().await;
1445                }
1446            });
1447            tokio::select! {
1448                changed = mode_rx.changed() => {
1449                    if changed.is_err() {
1450                        break;
1451                    }
1452                    if let PumpMode::Draining(request) = *mode_rx.borrow() {
1453                        drain_progress.get_or_insert(PumpDrainProgress {
1454                            request,
1455                            start_seen: stats.last_seen,
1456                        });
1457                    }
1458                }
1459                _ = &mut deadline_wait => {
1460                    if let Some(progress) = drain_progress {
1461                        return Ok(pump_drain_report(
1462                            stats,
1463                            progress.start_seen,
1464                            progress.request.up_to,
1465                            0,
1466                            PumpDrainStopReason::Deadline,
1467                        ));
1468                    }
1469                }
1470                received = stream.next() => {
1471                    let Some(received) = received else {
1472                        break;
1473                    };
1474                    let (event_id, logged) = received
1475                        .map_err(|error| format!("topic pump read failed for {topic}: {error}"))?;
1476                    record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 1).await?;
1477                    metrics_registry.record_orchestrator_pump_admission_delay(
1478                        topic.as_str(),
1479                        admission_delay(logged.occurred_at_ms),
1480                    );
1481                    wait_for_pump_drain_release(
1482                        &event_log,
1483                        &topic,
1484                        event_id,
1485                        &mut pump_drain_gate_rx,
1486                    )
1487                    .await?;
1488                    let handled = process(logged).await?;
1489                    stats.last_seen = event_id;
1490                    if handled {
1491                        stats.processed += 1;
1492                    }
1493                    event_log
1494                        .ack(&topic, &consumer, event_id)
1495                        .await
1496                        .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1497                    record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1498                }
1499            }
1500        }
1501        Ok(drain_progress
1502            .map(|progress| {
1503                pump_drain_report(
1504                    stats,
1505                    progress.start_seen,
1506                    progress.request.up_to,
1507                    0,
1508                    PumpDrainStopReason::Drained,
1509                )
1510            })
1511            .unwrap_or_else(|| PumpDrainReport {
1512                stats,
1513                drain_items: 0,
1514                remaining_queued: 0,
1515                outstanding_tasks: 0,
1516                stop_reason: PumpDrainStopReason::Drained,
1517            }))
1518    });
1519    Ok(PumpHandle { mode_tx, join })
1520}
1521
1522// ── Graceful shutdown ─────────────────────────────────────────────────────────
1523
1524#[allow(clippy::too_many_arguments)]
1525async fn graceful_shutdown(
1526    ctx: GracefulShutdownCtx<'_>,
1527    listener: ListenerRuntime,
1528    dispatcher: harn_vm::Dispatcher,
1529    pending_pumps: Vec<(String, PumpHandle)>,
1530    cron_pump: PumpHandle,
1531    inbox_pumps: Vec<(String, PumpHandle)>,
1532    waitpoint_pump: PumpHandle,
1533    waitpoint_cancel_pump: PumpHandle,
1534    waitpoint_sweeper: WaitpointSweepHandle,
1535) -> Result<(), OrchestratorError> {
1536    eprintln!("[harn] signal received, starting graceful shutdown...");
1537    tracing::info!(
1538        component = "orchestrator",
1539        trace_id = "",
1540        shutdown_timeout_secs = ctx.shutdown_timeout.as_secs(),
1541        "signal received, starting graceful shutdown"
1542    );
1543    let listener_in_flight = listener
1544        .trigger_metrics()
1545        .into_values()
1546        .map(|metrics| metrics.in_flight)
1547        .sum::<u64>();
1548    let dispatcher_before = dispatcher.snapshot();
1549    append_lifecycle_event(
1550        ctx.event_log,
1551        "draining",
1552        json!({
1553            "bind": ctx.bind.to_string(),
1554            "role": ctx.role.as_str(),
1555            "status": "draining",
1556            "http_in_flight": listener_in_flight,
1557            "dispatcher_in_flight": dispatcher_before.in_flight,
1558            "dispatcher_retry_queue_depth": dispatcher_before.retry_queue_depth,
1559            "dispatcher_dlq_depth": dispatcher_before.dlq_depth,
1560            "shutdown_timeout_secs": ctx.shutdown_timeout.as_secs(),
1561            "drain_max_items": ctx.drain_config.max_items,
1562            "drain_deadline_secs": ctx.drain_config.deadline.as_secs(),
1563        }),
1564    )
1565    .await?;
1566
1567    let deadline = tokio::time::Instant::now() + ctx.shutdown_timeout;
1568    let listener_metrics = listener.shutdown(remaining_budget(deadline)).await?;
1569    for handle in &ctx.connectors.handles {
1570        let connector = handle.lock().await;
1571        if let Err(error) = connector.shutdown(remaining_budget(deadline)).await {
1572            eprintln!(
1573                "[harn] connector {} shutdown warning: {error}",
1574                connector.provider_id().as_str()
1575            );
1576        }
1577    }
1578
1579    let mut pending_processed = 0;
1580    for (topic_name, pump) in pending_pumps {
1581        let stats =
1582            drain_pump_best_effort(ctx.event_log, &topic_name, pump, ctx.drain_config, deadline)
1583                .await?;
1584        pending_processed += stats.stats.processed;
1585        emit_drain_truncated(ctx.event_log, &topic_name, stats, ctx.drain_config).await?;
1586    }
1587    let cron_stats = drain_pump_best_effort(
1588        ctx.event_log,
1589        CRON_TICK_TOPIC,
1590        cron_pump,
1591        ctx.drain_config,
1592        deadline,
1593    )
1594    .await?;
1595    emit_drain_truncated(ctx.event_log, CRON_TICK_TOPIC, cron_stats, ctx.drain_config).await?;
1596    let mut inbox_processed = 0;
1597    for (topic_name, pump) in inbox_pumps {
1598        let stats =
1599            drain_pump_best_effort(ctx.event_log, &topic_name, pump, ctx.drain_config, deadline)
1600                .await?;
1601        inbox_processed += stats.stats.processed;
1602        emit_drain_truncated(ctx.event_log, &topic_name, stats, ctx.drain_config).await?;
1603    }
1604    let waitpoint_stats = waitpoint_pump
1605        .drain(
1606            ctx.event_log,
1607            harn_vm::WAITPOINT_RESUME_TOPIC,
1608            topic_latest_id(ctx.event_log, harn_vm::WAITPOINT_RESUME_TOPIC).await?,
1609            ctx.drain_config,
1610            deadline,
1611        )
1612        .await?;
1613    emit_drain_truncated(
1614        ctx.event_log,
1615        harn_vm::WAITPOINT_RESUME_TOPIC,
1616        waitpoint_stats,
1617        ctx.drain_config,
1618    )
1619    .await?;
1620    let waitpoint_cancel_stats = waitpoint_cancel_pump
1621        .drain(
1622            ctx.event_log,
1623            harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC,
1624            topic_latest_id(ctx.event_log, harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC).await?,
1625            ctx.drain_config,
1626            deadline,
1627        )
1628        .await?;
1629    emit_drain_truncated(
1630        ctx.event_log,
1631        harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC,
1632        waitpoint_cancel_stats,
1633        ctx.drain_config,
1634    )
1635    .await?;
1636    waitpoint_sweeper.shutdown().await?;
1637    let drain_report = dispatcher
1638        .drain(remaining_budget(deadline))
1639        .await
1640        .map_err(|error| format!("failed to drain dispatcher: {error}"))?;
1641
1642    let stopped_at = now_rfc3339()?;
1643    let timed_out = !drain_report.drained;
1644    if timed_out {
1645        dispatcher.shutdown();
1646    }
1647    append_lifecycle_event(
1648        ctx.event_log,
1649        "stopped",
1650        json!({
1651            "bind": ctx.bind.to_string(),
1652            "role": ctx.role.as_str(),
1653            "status": "stopped",
1654            "http_in_flight": listener_in_flight,
1655            "dispatcher_in_flight": drain_report.in_flight,
1656            "dispatcher_retry_queue_depth": drain_report.retry_queue_depth,
1657            "dispatcher_dlq_depth": drain_report.dlq_depth,
1658            "pending_events_drained": pending_processed,
1659            "cron_events_drained": cron_stats.stats.processed,
1660            "inbox_events_drained": inbox_processed,
1661            "waitpoint_events_drained": waitpoint_stats.stats.processed,
1662            "waitpoint_cancel_events_drained": waitpoint_cancel_stats.stats.processed,
1663            "timed_out": timed_out,
1664        }),
1665    )
1666    .await?;
1667    ctx.event_log
1668        .flush()
1669        .await
1670        .map_err(|error| format!("failed to flush event log: {error}"))?;
1671
1672    write_state_snapshot(
1673        &ctx.state_dir.join(STATE_SNAPSHOT_FILE),
1674        &ServeStateSnapshot {
1675            status: "stopped".to_string(),
1676            role: ctx.role.as_str().to_string(),
1677            bind: ctx.bind.to_string(),
1678            listener_url: ctx.listener_url.clone(),
1679            manifest_path: ctx.config_path.display().to_string(),
1680            state_dir: ctx.state_dir.display().to_string(),
1681            started_at: ctx.startup_started_at.to_string(),
1682            stopped_at: Some(stopped_at),
1683            secret_provider_chain: ctx.secret_chain_display.to_string(),
1684            event_log_backend: ctx.event_log_description.backend.to_string(),
1685            event_log_location: ctx
1686                .event_log_description
1687                .location
1688                .as_ref()
1689                .map(|path| path.display().to_string()),
1690            triggers: trigger_state_snapshots(ctx.triggers, &listener_metrics),
1691            connectors: ctx.connectors.providers.clone(),
1692            activations: ctx
1693                .connectors
1694                .activations
1695                .iter()
1696                .map(|activation| ConnectorActivationSnapshot {
1697                    provider: activation.provider.as_str().to_string(),
1698                    binding_count: activation.binding_count,
1699                })
1700                .collect(),
1701        },
1702    )?;
1703
1704    if timed_out {
1705        eprintln!(
1706            "[harn] graceful shutdown timed out with {} dispatches and {} retry waits remaining",
1707            drain_report.in_flight, drain_report.retry_queue_depth
1708        );
1709    }
1710    eprintln!("[harn] graceful shutdown complete");
1711    tracing::info!(
1712        component = "orchestrator",
1713        trace_id = "",
1714        "graceful shutdown complete"
1715    );
1716    Ok(())
1717}
1718
1719struct GracefulShutdownCtx<'a> {
1720    role: OrchestratorRole,
1721    bind: SocketAddr,
1722    listener_url: String,
1723    config_path: &'a Path,
1724    state_dir: &'a Path,
1725    startup_started_at: &'a str,
1726    event_log: &'a Arc<AnyEventLog>,
1727    event_log_description: &'a harn_vm::event_log::EventLogDescription,
1728    secret_chain_display: &'a str,
1729    triggers: &'a [CollectedManifestTrigger],
1730    connectors: &'a ConnectorRuntime,
1731    shutdown_timeout: Duration,
1732    drain_config: DrainConfig,
1733}
1734
1735// ── Manifest reload ───────────────────────────────────────────────────────────
1736
1737struct RuntimeCtx<'a> {
1738    role: OrchestratorRole,
1739    config_path: &'a Path,
1740    state_dir: &'a Path,
1741    bind: SocketAddr,
1742    startup_started_at: &'a str,
1743    event_log: &'a Arc<AnyEventLog>,
1744    event_log_description: &'a harn_vm::event_log::EventLogDescription,
1745    secret_chain_display: &'a str,
1746    listener: &'a ListenerRuntime,
1747    connectors: &'a mut ConnectorRuntime,
1748    live_manifest: &'a mut Manifest,
1749    live_triggers: &'a mut Vec<CollectedManifestTrigger>,
1750    secret_provider: &'a Arc<dyn harn_vm::secrets::SecretProvider>,
1751    metrics_registry: &'a Arc<harn_vm::MetricsRegistry>,
1752    mcp_service: Option<&'a Arc<crate::commands::mcp::serve::McpOrchestratorService>>,
1753    clock: Arc<dyn Clock>,
1754    #[cfg_attr(not(unix), allow(dead_code))]
1755    reload_rx: &'a mut mpsc::UnboundedReceiver<AdminReloadRequest>,
1756}
1757
1758#[cfg_attr(not(unix), allow(dead_code))]
1759async fn handle_reload_request(
1760    ctx: &mut RuntimeCtx<'_>,
1761    request: AdminReloadRequest,
1762) -> Result<(), OrchestratorError> {
1763    let source = request.source.clone();
1764    match reload_manifest(ctx).await {
1765        Ok(summary) => {
1766            if let Some(mcp_service) = ctx.mcp_service {
1767                mcp_service.notify_manifest_reloaded();
1768            }
1769            write_running_state_snapshot(ctx)?;
1770            append_manifest_event(
1771                ctx.event_log,
1772                "reload_succeeded",
1773                json!({
1774                    "source": source,
1775                    "summary": summary,
1776                }),
1777            )
1778            .await?;
1779            eprintln!(
1780                "[harn] manifest reload ({source}) applied: +{} ~{} -{}",
1781                summary.added.len(),
1782                summary.modified.len(),
1783                summary.removed.len()
1784            );
1785            if let Some(response_tx) = request.response_tx {
1786                let _ = response_tx.send(serde_json::to_value(&summary).map_err(|error| {
1787                    OrchestratorError::Serve(format!("failed to encode reload summary: {error}"))
1788                }));
1789            }
1790        }
1791        Err(error) => {
1792            eprintln!("[harn] manifest reload ({source}) failed: {error}");
1793            append_manifest_event(
1794                ctx.event_log,
1795                "reload_failed",
1796                json!({
1797                    "source": source,
1798                    "error": error.to_string(),
1799                }),
1800            )
1801            .await?;
1802            if let Some(response_tx) = request.response_tx {
1803                let _ = response_tx.send(Err(error));
1804            }
1805        }
1806    }
1807    Ok(())
1808}
1809
1810#[cfg_attr(not(unix), allow(dead_code))]
1811fn write_running_state_snapshot(ctx: &RuntimeCtx<'_>) -> Result<(), OrchestratorError> {
1812    let listener_metrics = ctx.listener.trigger_metrics();
1813    write_state_snapshot(
1814        &ctx.state_dir.join(STATE_SNAPSHOT_FILE),
1815        &ServeStateSnapshot {
1816            status: "running".to_string(),
1817            role: ctx.role.as_str().to_string(),
1818            bind: ctx.bind.to_string(),
1819            listener_url: ctx.listener.url(),
1820            manifest_path: ctx.config_path.display().to_string(),
1821            state_dir: ctx.state_dir.display().to_string(),
1822            started_at: ctx.startup_started_at.to_string(),
1823            stopped_at: None,
1824            secret_provider_chain: ctx.secret_chain_display.to_string(),
1825            event_log_backend: ctx.event_log_description.backend.to_string(),
1826            event_log_location: ctx
1827                .event_log_description
1828                .location
1829                .as_ref()
1830                .map(|path| path.display().to_string()),
1831            triggers: trigger_state_snapshots(ctx.live_triggers, &listener_metrics),
1832            connectors: ctx.connectors.providers.clone(),
1833            activations: ctx
1834                .connectors
1835                .activations
1836                .iter()
1837                .map(|activation| ConnectorActivationSnapshot {
1838                    provider: activation.provider.as_str().to_string(),
1839                    binding_count: activation.binding_count,
1840                })
1841                .collect(),
1842        },
1843    )
1844}
1845
1846#[cfg_attr(not(unix), allow(dead_code))]
1847async fn reload_manifest(
1848    ctx: &mut RuntimeCtx<'_>,
1849) -> Result<ManifestReloadSummary, OrchestratorError> {
1850    let (manifest, manifest_dir) = load_manifest(ctx.config_path)?;
1851    let mut vm = ctx
1852        .role
1853        .build_vm(&manifest_dir, &manifest_dir, ctx.state_dir)?;
1854    let extensions = package::load_runtime_extensions(ctx.config_path);
1855    let collected_triggers = package::collect_manifest_triggers(&mut vm, &extensions)
1856        .await
1857        .map_err(|error| format!("failed to collect manifest triggers: {error}"))?;
1858    let summary = summarize_manifest_reload(ctx.live_triggers, &collected_triggers);
1859    let connector_reload =
1860        connector_reload_fingerprint_map(ctx.live_triggers, &ctx.connectors.provider_overrides)
1861            != connector_reload_fingerprint_map(
1862                &collected_triggers,
1863                &extensions.provider_connectors,
1864            );
1865    let next_connector_runtime = if connector_reload {
1866        let mut runtime = initialize_connectors(
1867            &collected_triggers,
1868            ctx.event_log.clone(),
1869            ctx.secret_provider.clone(),
1870            ctx.metrics_registry.clone(),
1871            &extensions.provider_connectors,
1872            ctx.clock.clone(),
1873        )
1874        .await?;
1875        runtime.activations = runtime
1876            .registry
1877            .activate_all(&runtime.trigger_registry)
1878            .await
1879            .map_err(|error| error.to_string())?;
1880        Some(runtime)
1881    } else {
1882        None
1883    };
1884    let previous_manifest = ctx.live_manifest.clone();
1885    let previous_triggers = ctx.live_triggers.clone();
1886    package::install_collected_manifest_triggers(&collected_triggers).await?;
1887    apply_supervisor_state(ctx.state_dir).await?;
1888    let binding_versions = live_manifest_binding_versions();
1889    let route_registry = next_connector_runtime
1890        .as_ref()
1891        .map(|runtime| &runtime.registry)
1892        .unwrap_or(&ctx.connectors.registry);
1893    let route_overrides = next_connector_runtime
1894        .as_ref()
1895        .map(|runtime| runtime.provider_overrides.as_slice())
1896        .unwrap_or(ctx.connectors.provider_overrides.as_slice());
1897    let route_configs = match build_route_configs(&collected_triggers, &binding_versions)
1898        .and_then(|routes| attach_route_connectors(routes, route_registry, route_overrides))
1899    {
1900        Ok(routes) => routes,
1901        Err(error) => {
1902            rollback_manifest_reload(ctx, &previous_manifest, &previous_triggers)
1903                .await
1904                .map_err(|rollback| format!("{error}; rollback failed: {rollback}"))?;
1905            return Err(error);
1906        }
1907    };
1908    if let Err(error) = ctx.listener.reload_routes(route_configs) {
1909        rollback_manifest_reload(ctx, &previous_manifest, &previous_triggers)
1910            .await
1911            .map_err(|rollback| format!("{error}; rollback failed: {rollback}"))?;
1912        return Err(error);
1913    }
1914    if let Some(runtime) = next_connector_runtime {
1915        let previous_handles = ctx.connectors.handles.clone();
1916        let connector_clients = runtime.registry.client_map().await;
1917        harn_vm::install_active_connector_clients(connector_clients);
1918        *ctx.connectors = runtime;
1919        for handle in previous_handles {
1920            let connector = handle.lock().await;
1921            if let Err(error) = connector.shutdown(Duration::from_secs(5)).await {
1922                eprintln!(
1923                    "[harn] connector {} reload shutdown warning: {error}",
1924                    connector.provider_id().as_str()
1925                );
1926            }
1927        }
1928    }
1929    *ctx.live_manifest = manifest;
1930    *ctx.live_triggers = collected_triggers;
1931    Ok(summary)
1932}
1933
1934#[cfg_attr(not(unix), allow(dead_code))]
1935async fn rollback_manifest_reload(
1936    ctx: &mut RuntimeCtx<'_>,
1937    previous_manifest: &Manifest,
1938    previous_triggers: &[CollectedManifestTrigger],
1939) -> Result<(), OrchestratorError> {
1940    package::install_collected_manifest_triggers(previous_triggers).await?;
1941    apply_supervisor_state(ctx.state_dir).await?;
1942    let binding_versions = live_manifest_binding_versions();
1943    let route_configs = build_route_configs(previous_triggers, &binding_versions)?;
1944    let route_configs = attach_route_connectors(
1945        route_configs,
1946        &ctx.connectors.registry,
1947        &ctx.connectors.provider_overrides,
1948    )?;
1949    ctx.listener.reload_routes(route_configs)?;
1950    *ctx.live_manifest = previous_manifest.clone();
1951    *ctx.live_triggers = previous_triggers.to_vec();
1952    Ok(())
1953}
1954
1955#[cfg_attr(not(unix), allow(dead_code))]
1956fn summarize_manifest_reload(
1957    current: &[CollectedManifestTrigger],
1958    next: &[CollectedManifestTrigger],
1959) -> ManifestReloadSummary {
1960    let current_map = trigger_fingerprint_map(current, true);
1961    let next_map = trigger_fingerprint_map(next, true);
1962    let mut summary = ManifestReloadSummary::default();
1963    let ids: BTreeSet<String> = current_map.keys().chain(next_map.keys()).cloned().collect();
1964    for id in ids {
1965        match (current_map.get(&id), next_map.get(&id)) {
1966            (None, Some(_)) => summary.added.push(id),
1967            (Some(_), None) => summary.removed.push(id),
1968            (Some(left), Some(right)) if left == right => summary.unchanged.push(id),
1969            (Some(_), Some(_)) => summary.modified.push(id),
1970            (None, None) => {}
1971        }
1972    }
1973    summary
1974}
1975
1976#[cfg_attr(not(unix), allow(dead_code))]
1977fn trigger_fingerprint_map(
1978    triggers: &[CollectedManifestTrigger],
1979    include_http_managed: bool,
1980) -> BTreeMap<String, String> {
1981    triggers
1982        .iter()
1983        .filter(|trigger| include_http_managed || !is_http_managed_trigger(trigger))
1984        .map(|trigger| {
1985            let spec = package::manifest_trigger_binding_spec(trigger.clone());
1986            (trigger.config.id.clone(), spec.definition_fingerprint)
1987        })
1988        .collect()
1989}
1990
1991#[cfg_attr(not(unix), allow(dead_code))]
1992fn connector_reload_fingerprint_map(
1993    triggers: &[CollectedManifestTrigger],
1994    provider_overrides: &[ResolvedProviderConnectorConfig],
1995) -> BTreeMap<String, Vec<String>> {
1996    let mut by_provider = BTreeMap::<String, Vec<String>>::new();
1997    for trigger in triggers {
1998        let provider = trigger.config.provider.as_str().to_string();
1999        if !connector_owns_ingress(&provider, provider_overrides)
2000            && matches!(
2001                trigger.config.kind,
2002                crate::package::TriggerKind::Webhook | crate::package::TriggerKind::A2aPush
2003            )
2004        {
2005            continue;
2006        }
2007        let spec = package::manifest_trigger_binding_spec(trigger.clone());
2008        by_provider
2009            .entry(provider)
2010            .or_default()
2011            .push(spec.definition_fingerprint);
2012    }
2013    for override_config in provider_overrides {
2014        by_provider
2015            .entry(override_config.id.as_str().to_string())
2016            .or_default()
2017            .push(provider_connector_fingerprint(override_config));
2018    }
2019    for fingerprints in by_provider.values_mut() {
2020        fingerprints.sort();
2021    }
2022    by_provider
2023}
2024
2025#[cfg_attr(not(unix), allow(dead_code))]
2026fn provider_connector_fingerprint(config: &ResolvedProviderConnectorConfig) -> String {
2027    match &config.connector {
2028        ResolvedProviderConnectorKind::RustBuiltin => format!(
2029            "{}::builtin@{}",
2030            config.id.as_str(),
2031            config.manifest_dir.display()
2032        ),
2033        ResolvedProviderConnectorKind::Harn { module } => format!(
2034            "{}::harn:{}@{}",
2035            config.id.as_str(),
2036            module,
2037            config.manifest_dir.display()
2038        ),
2039        ResolvedProviderConnectorKind::Invalid(message) => format!(
2040            "{}::invalid:{}@{}",
2041            config.id.as_str(),
2042            message,
2043            config.manifest_dir.display()
2044        ),
2045    }
2046}
2047
2048#[cfg_attr(not(unix), allow(dead_code))]
2049fn is_http_managed_trigger(trigger: &CollectedManifestTrigger) -> bool {
2050    matches!(
2051        trigger.config.kind,
2052        crate::package::TriggerKind::Webhook | crate::package::TriggerKind::A2aPush
2053    )
2054}
2055
2056// ── Connector helpers ─────────────────────────────────────────────────────────
2057
2058async fn initialize_connectors(
2059    triggers: &[CollectedManifestTrigger],
2060    event_log: Arc<harn_vm::event_log::AnyEventLog>,
2061    secrets: Arc<dyn harn_vm::secrets::SecretProvider>,
2062    metrics: Arc<harn_vm::MetricsRegistry>,
2063    provider_overrides: &[ResolvedProviderConnectorConfig],
2064    clock: Arc<dyn Clock>,
2065) -> Result<ConnectorRuntime, OrchestratorError> {
2066    let mut registry = harn_vm::ConnectorRegistry::default();
2067    let mut trigger_registry = harn_vm::TriggerRegistry::default();
2068    let mut grouped_kinds: BTreeMap<harn_vm::ProviderId, BTreeSet<String>> = BTreeMap::new();
2069
2070    for trigger in triggers {
2071        let binding = trigger_binding_for(&trigger.config)?;
2072        grouped_kinds
2073            .entry(binding.provider.clone())
2074            .or_default()
2075            .insert(binding.kind.as_str().to_string());
2076        trigger_registry.register(binding);
2077    }
2078
2079    let ctx = harn_vm::ConnectorCtx {
2080        inbox: Arc::new(
2081            harn_vm::InboxIndex::new(event_log.clone(), metrics.clone())
2082                .await
2083                .map_err(|error| error.to_string())?,
2084        ),
2085        event_log,
2086        secrets,
2087        metrics,
2088        rate_limiter: Arc::new(harn_vm::RateLimiterFactory::default()),
2089    };
2090
2091    let mut providers = Vec::new();
2092    let mut handles = Vec::new();
2093    for (provider, kinds) in grouped_kinds {
2094        let provider_name = provider.as_str().to_string();
2095        if let Some(connector) = connector_override_for(&provider, provider_overrides).await? {
2096            registry.remove(&provider);
2097            registry
2098                .register(connector)
2099                .map_err(|error| error.to_string())?;
2100        }
2101        if registry.get(&provider).is_none() {
2102            if provider_requires_harn_connector(provider.as_str()) {
2103                return Err(format!(
2104                    "provider '{}' is package-backed; add [[providers]] id = \"{}\" with \
2105                     connector = {{ harn = \"...\" }} to the manifest",
2106                    provider.as_str(),
2107                    provider.as_str()
2108                )
2109                .into());
2110            }
2111            let connector = connector_for(&provider, kinds, clock.clone());
2112            registry
2113                .register(connector)
2114                .map_err(|error| error.to_string())?;
2115        }
2116        let handle = registry
2117            .get(&provider)
2118            .ok_or_else(|| format!("connector registry lost provider '{}'", provider.as_str()))?;
2119        handle
2120            .lock()
2121            .await
2122            .init(ctx.clone())
2123            .await
2124            .map_err(|error| error.to_string())?;
2125        handles.push(handle.clone());
2126        providers.push(provider_name);
2127    }
2128
2129    Ok(ConnectorRuntime {
2130        registry,
2131        trigger_registry,
2132        handles,
2133        providers,
2134        activations: Vec::new(),
2135        provider_overrides: provider_overrides.to_vec(),
2136    })
2137}
2138
2139fn trigger_binding_for(
2140    config: &ResolvedTriggerConfig,
2141) -> Result<harn_vm::TriggerBinding, OrchestratorError> {
2142    Ok(harn_vm::TriggerBinding {
2143        provider: config.provider.clone(),
2144        kind: harn_vm::TriggerKind::from(trigger_kind_name(config.kind)),
2145        binding_id: config.id.clone(),
2146        dedupe_key: config.dedupe_key.clone(),
2147        dedupe_retention_days: config.retry.retention_days,
2148        config: connector_binding_config(config)?,
2149    })
2150}
2151
2152fn connector_binding_config(
2153    config: &ResolvedTriggerConfig,
2154) -> Result<JsonValue, OrchestratorError> {
2155    match config.kind {
2156        crate::package::TriggerKind::Cron => {
2157            serde_json::to_value(&config.kind_specific).map_err(|error| {
2158                OrchestratorError::Serve({
2159                    format!(
2160                        "failed to encode cron trigger config '{}': {error}",
2161                        config.id
2162                    )
2163                })
2164            })
2165        }
2166        crate::package::TriggerKind::Webhook => Ok(serde_json::json!({
2167            "match": config.match_,
2168            "secrets": config.secrets,
2169            "webhook": config.kind_specific,
2170        })),
2171        crate::package::TriggerKind::Poll => Ok(serde_json::json!({
2172            "match": config.match_,
2173            "secrets": config.secrets,
2174            "poll": config.kind_specific,
2175        })),
2176        crate::package::TriggerKind::Stream => Ok(serde_json::json!({
2177            "match": config.match_,
2178            "secrets": config.secrets,
2179            "stream": config.kind_specific,
2180            "window": config.window,
2181        })),
2182        crate::package::TriggerKind::A2aPush => Ok(serde_json::json!({
2183            "match": config.match_,
2184            "secrets": config.secrets,
2185            "a2a_push": a2a_push_connector_config(&config.kind_specific)?,
2186        })),
2187        _ => Ok(JsonValue::Null),
2188    }
2189}
2190
2191fn a2a_push_connector_config(
2192    kind_specific: &BTreeMap<String, toml::Value>,
2193) -> Result<JsonValue, OrchestratorError> {
2194    if let Some(nested) = kind_specific.get("a2a_push") {
2195        return serde_json::to_value(nested).map_err(|error| {
2196            OrchestratorError::Serve(format!("failed to encode a2a_push trigger config: {error}"))
2197        });
2198    }
2199    let filtered = kind_specific
2200        .iter()
2201        .filter(|(key, _)| key.as_str() != "path")
2202        .map(|(key, value)| (key.clone(), value.clone()))
2203        .collect::<BTreeMap<_, _>>();
2204    serde_json::to_value(filtered).map_err(|error| {
2205        OrchestratorError::Serve(format!("failed to encode a2a_push trigger config: {error}"))
2206    })
2207}
2208
2209fn connector_for(
2210    provider: &harn_vm::ProviderId,
2211    kinds: BTreeSet<String>,
2212    clock: Arc<dyn Clock>,
2213) -> Box<dyn harn_vm::Connector> {
2214    match provider.as_str() {
2215        "cron" => Box::new(harn_vm::CronConnector::with_clock(clock)),
2216        _ => Box::new(PlaceholderConnector::new(provider.clone(), kinds)),
2217    }
2218}
2219
2220async fn connector_override_for(
2221    provider: &harn_vm::ProviderId,
2222    provider_overrides: &[ResolvedProviderConnectorConfig],
2223) -> Result<Option<Box<dyn harn_vm::Connector>>, OrchestratorError> {
2224    let Some(override_config) = provider_overrides
2225        .iter()
2226        .find(|entry| entry.id == *provider)
2227    else {
2228        return Ok(None);
2229    };
2230    match &override_config.connector {
2231        ResolvedProviderConnectorKind::RustBuiltin => Ok(None),
2232        ResolvedProviderConnectorKind::Invalid(message) => {
2233            Err(OrchestratorError::Serve(message.clone()))
2234        }
2235        ResolvedProviderConnectorKind::Harn { module } => {
2236            let module_path =
2237                harn_vm::resolve_module_import_path(&override_config.manifest_dir, module);
2238            let connector = harn_vm::HarnConnector::load(&module_path)
2239                .await
2240                .map_err(|error| {
2241                    format!(
2242                        "failed to load Harn connector '{}' for provider '{}': {error}",
2243                        module_path.display(),
2244                        provider.as_str()
2245                    )
2246                })?;
2247            Ok(Some(Box::new(connector)))
2248        }
2249    }
2250}
2251
2252fn build_route_configs(
2253    triggers: &[CollectedManifestTrigger],
2254    binding_versions: &BTreeMap<String, u32>,
2255) -> Result<Vec<RouteConfig>, OrchestratorError> {
2256    let mut seen_paths = BTreeSet::new();
2257    let mut routes = Vec::new();
2258    for trigger in triggers {
2259        let Some(binding_version) = binding_versions.get(&trigger.config.id).copied() else {
2260            return Err(format!(
2261                "trigger registry is missing active manifest binding '{}'",
2262                trigger.config.id
2263            )
2264            .into());
2265        };
2266        if let Some(route) = RouteConfig::from_trigger(trigger, binding_version)? {
2267            if !seen_paths.insert(route.path.clone()) {
2268                return Err(format!(
2269                    "trigger route '{}' is configured more than once",
2270                    route.path
2271                )
2272                .into());
2273            }
2274            routes.push(route);
2275        }
2276    }
2277    Ok(routes)
2278}
2279
2280fn attach_route_connectors(
2281    routes: Vec<RouteConfig>,
2282    registry: &harn_vm::ConnectorRegistry,
2283    provider_overrides: &[ResolvedProviderConnectorConfig],
2284) -> Result<Vec<RouteConfig>, OrchestratorError> {
2285    routes
2286        .into_iter()
2287        .map(|mut route| {
2288            if route.connector_ingress
2289                || connector_owns_ingress(route.provider.as_str(), provider_overrides)
2290            {
2291                route.connector = Some(registry.get(&route.provider).ok_or_else(|| {
2292                    format!(
2293                        "connector registry is missing provider '{}'",
2294                        route.provider.as_str()
2295                    )
2296                })?);
2297            }
2298            Ok(route)
2299        })
2300        .collect()
2301}
2302
2303fn connector_owns_ingress(
2304    provider: &str,
2305    provider_overrides: &[ResolvedProviderConnectorConfig],
2306) -> bool {
2307    provider_overrides.iter().any(|entry| {
2308        entry.id.as_str() == provider
2309            && matches!(entry.connector, ResolvedProviderConnectorKind::Harn { .. })
2310    })
2311}
2312
2313fn provider_requires_harn_connector(provider: &str) -> bool {
2314    harn_vm::provider_metadata(provider).is_some_and(|metadata| {
2315        matches!(
2316            metadata.runtime,
2317            harn_vm::ProviderRuntimeMetadata::Placeholder
2318        )
2319    })
2320}
2321
2322fn live_manifest_binding_versions() -> BTreeMap<String, u32> {
2323    let mut versions = BTreeMap::new();
2324    for binding in harn_vm::snapshot_trigger_bindings() {
2325        if binding.source != harn_vm::TriggerBindingSource::Manifest {
2326            continue;
2327        }
2328        if binding.state == harn_vm::TriggerState::Terminated {
2329            continue;
2330        }
2331        versions
2332            .entry(binding.id)
2333            .and_modify(|current: &mut u32| *current = (*current).max(binding.version))
2334            .or_insert(binding.version);
2335    }
2336    versions
2337}
2338
2339fn trigger_state_snapshots(
2340    triggers: &[CollectedManifestTrigger],
2341    listener_metrics: &BTreeMap<String, TriggerMetricSnapshot>,
2342) -> Vec<TriggerStateSnapshot> {
2343    let bindings_by_id = harn_vm::snapshot_trigger_bindings()
2344        .into_iter()
2345        .filter(|binding| binding.source == harn_vm::TriggerBindingSource::Manifest)
2346        .fold(
2347            BTreeMap::<String, harn_vm::TriggerBindingSnapshot>::new(),
2348            |mut acc, binding| {
2349                match acc.get(&binding.id) {
2350                    Some(current) if current.version >= binding.version => {}
2351                    _ => {
2352                        acc.insert(binding.id.clone(), binding);
2353                    }
2354                }
2355                acc
2356            },
2357        );
2358
2359    triggers
2360        .iter()
2361        .map(|trigger| {
2362            let runtime = bindings_by_id.get(&trigger.config.id);
2363            let metrics = listener_metrics.get(&trigger.config.id);
2364            TriggerStateSnapshot {
2365                id: trigger.config.id.clone(),
2366                provider: trigger.config.provider.as_str().to_string(),
2367                kind: trigger_kind_name(trigger.config.kind).to_string(),
2368                handler: handler_kind(&trigger.handler).to_string(),
2369                version: runtime.map(|binding| binding.version),
2370                state: runtime.map(|binding| binding.state.as_str().to_string()),
2371                received: metrics.map(|value| value.received).unwrap_or(0),
2372                dispatched: metrics.map(|value| value.dispatched).unwrap_or(0),
2373                failed: metrics.map(|value| value.failed).unwrap_or(0),
2374                in_flight: metrics.map(|value| value.in_flight).unwrap_or(0),
2375            }
2376        })
2377        .collect()
2378}
2379
2380// ── Misc helpers ──────────────────────────────────────────────────────────────
2381
2382fn format_trigger_summary(triggers: &[CollectedManifestTrigger]) -> String {
2383    if triggers.is_empty() {
2384        return "none".to_string();
2385    }
2386    triggers
2387        .iter()
2388        .map(|trigger| {
2389            format!(
2390                "{} [{}:{} -> {}]",
2391                trigger.config.id,
2392                trigger.config.provider.as_str(),
2393                trigger_kind_name(trigger.config.kind),
2394                handler_kind(&trigger.handler)
2395            )
2396        })
2397        .collect::<Vec<_>>()
2398        .join(", ")
2399}
2400
2401fn format_activation_summary(activations: &[harn_vm::ActivationHandle]) -> String {
2402    if activations.is_empty() {
2403        return "none".to_string();
2404    }
2405    activations
2406        .iter()
2407        .map(|activation| {
2408            format!(
2409                "{}({})",
2410                activation.provider.as_str(),
2411                activation.binding_count
2412            )
2413        })
2414        .collect::<Vec<_>>()
2415        .join(", ")
2416}
2417
2418fn handler_kind(handler: &CollectedTriggerHandler) -> &'static str {
2419    match handler {
2420        CollectedTriggerHandler::Local { .. } => "local",
2421        CollectedTriggerHandler::A2a { .. } => "a2a",
2422        CollectedTriggerHandler::Worker { .. } => "worker",
2423        CollectedTriggerHandler::Persona { .. } => "persona",
2424    }
2425}
2426
2427fn trigger_kind_name(kind: crate::package::TriggerKind) -> &'static str {
2428    match kind {
2429        crate::package::TriggerKind::Webhook => "webhook",
2430        crate::package::TriggerKind::Cron => "cron",
2431        crate::package::TriggerKind::Poll => "poll",
2432        crate::package::TriggerKind::Stream => "stream",
2433        crate::package::TriggerKind::Predicate => "predicate",
2434        crate::package::TriggerKind::A2aPush => "a2a-push",
2435    }
2436}
2437
2438fn has_orchestrator_api_keys_configured() -> bool {
2439    std::env::var("HARN_ORCHESTRATOR_API_KEYS")
2440        .ok()
2441        .is_some_and(|value| value.split(',').any(|segment| !segment.trim().is_empty()))
2442}
2443
2444fn has_mcp_oauth_configured() -> bool {
2445    std::env::var("HARN_MCP_OAUTH_AUTHORIZATION_SERVERS")
2446        .ok()
2447        .is_some_and(|value| value.split(',').any(|segment| !segment.trim().is_empty()))
2448}
2449
2450fn validate_mcp_paths(
2451    path: &str,
2452    sse_path: &str,
2453    messages_path: &str,
2454) -> Result<(), OrchestratorError> {
2455    let reserved = [
2456        "/health",
2457        "/healthz",
2458        "/readyz",
2459        "/metrics",
2460        "/admin/reload",
2461        "/acp",
2462    ];
2463    let mut seen = BTreeSet::new();
2464    for (label, value) in [
2465        ("--mcp-path", path),
2466        ("--mcp-sse-path", sse_path),
2467        ("--mcp-messages-path", messages_path),
2468    ] {
2469        if !value.starts_with('/') {
2470            return Err(format!("{label} must start with '/'").into());
2471        }
2472        if value == "/" {
2473            return Err(format!("{label} cannot be '/'").into());
2474        }
2475        if reserved.contains(&value) {
2476            return Err(format!("{label} cannot use reserved listener path '{value}'").into());
2477        }
2478        if !seen.insert(value) {
2479            return Err(format!("embedded MCP paths must be unique; duplicate '{value}'").into());
2480        }
2481    }
2482    Ok(())
2483}
2484
2485async fn append_lifecycle_event(
2486    log: &Arc<AnyEventLog>,
2487    kind: &str,
2488    payload: JsonValue,
2489) -> Result<(), OrchestratorError> {
2490    let topic =
2491        harn_vm::event_log::Topic::new(LIFECYCLE_TOPIC).map_err(|error| error.to_string())?;
2492    log.append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
2493        .await
2494        .map(|_| ())
2495        .map_err(|error| {
2496            OrchestratorError::Serve(format!(
2497                "failed to append orchestrator lifecycle event: {error}"
2498            ))
2499        })
2500}
2501
2502async fn append_pump_lifecycle_event(
2503    log: &Arc<AnyEventLog>,
2504    kind: &str,
2505    payload: JsonValue,
2506) -> Result<(), OrchestratorError> {
2507    append_lifecycle_event(log, kind, payload).await
2508}
2509
2510async fn wait_for_pump_drain_release(
2511    log: &Arc<AnyEventLog>,
2512    topic: &harn_vm::event_log::Topic,
2513    event_id: u64,
2514    gate_rx: &mut watch::Receiver<bool>,
2515) -> Result<(), OrchestratorError> {
2516    if !*gate_rx.borrow() {
2517        return Ok(());
2518    }
2519    append_pump_lifecycle_event(
2520        log,
2521        "pump_drain_waiting",
2522        json!({
2523            "topic": topic.as_str(),
2524            "event_log_id": event_id,
2525        }),
2526    )
2527    .await?;
2528    while *gate_rx.borrow() {
2529        if gate_rx.changed().await.is_err() {
2530            break;
2531        }
2532    }
2533    Ok(())
2534}
2535
2536#[cfg_attr(not(unix), allow(dead_code))]
2537async fn append_manifest_event(
2538    log: &Arc<AnyEventLog>,
2539    kind: &str,
2540    payload: JsonValue,
2541) -> Result<(), OrchestratorError> {
2542    let topic =
2543        harn_vm::event_log::Topic::new(MANIFEST_TOPIC).map_err(|error| error.to_string())?;
2544    log.append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
2545        .await
2546        .map(|_| ())
2547        .map_err(|error| {
2548            OrchestratorError::Serve(format!(
2549                "failed to append orchestrator manifest event: {error}"
2550            ))
2551        })
2552}
2553
2554async fn record_pump_metrics(
2555    metrics: &harn_vm::MetricsRegistry,
2556    log: &Arc<AnyEventLog>,
2557    topic: &harn_vm::event_log::Topic,
2558    last_seen: u64,
2559    outstanding: usize,
2560) -> Result<(), OrchestratorError> {
2561    let latest = log.latest(topic).await.ok().flatten().unwrap_or(last_seen);
2562    let backlog = latest.saturating_sub(last_seen);
2563    metrics.set_orchestrator_pump_outstanding(topic.as_str(), outstanding);
2564    metrics.set_orchestrator_pump_backlog(topic.as_str(), backlog);
2565    append_pump_lifecycle_event(
2566        log,
2567        "pump_metrics_recorded",
2568        json!({
2569            "topic": topic.as_str(),
2570            "latest": latest,
2571            "last_seen": last_seen,
2572            "backlog": backlog,
2573            "outstanding": outstanding,
2574        }),
2575    )
2576    .await
2577}
2578
2579fn admission_delay(occurred_at_ms: i64) -> Duration {
2580    let now = std::time::SystemTime::now()
2581        .duration_since(std::time::UNIX_EPOCH)
2582        .unwrap_or_default()
2583        .as_millis() as i64;
2584    Duration::from_millis(now.saturating_sub(occurred_at_ms).max(0) as u64)
2585}
2586
2587async fn emit_drain_truncated(
2588    log: &Arc<AnyEventLog>,
2589    topic_name: &str,
2590    report: PumpDrainReport,
2591    config: DrainConfig,
2592) -> Result<(), OrchestratorError> {
2593    if !report.truncated() {
2594        return Ok(());
2595    }
2596    eprintln!(
2597        "[harn] warning: pump drain truncated for {topic_name}: remaining_queued={} drain_items={} reason={}",
2598        report.remaining_queued,
2599        report.drain_items,
2600        report.stop_reason.as_str()
2601    );
2602    append_lifecycle_event(
2603        log,
2604        "drain_truncated",
2605        json!({
2606            "topic": topic_name,
2607            "remaining_queued": report.remaining_queued,
2608            "drain_items": report.drain_items,
2609            "outstanding_tasks": report.outstanding_tasks,
2610            "max_items": config.max_items,
2611            "deadline_secs": config.deadline.as_secs(),
2612            "reason": report.stop_reason.as_str(),
2613        }),
2614    )
2615    .await
2616}
2617
2618async fn topic_latest_id(
2619    log: &Arc<AnyEventLog>,
2620    topic_name: &str,
2621) -> Result<u64, OrchestratorError> {
2622    let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
2623    log.latest(&topic)
2624        .await
2625        .map(|value| value.unwrap_or(0))
2626        .map_err(|error| {
2627            OrchestratorError::Serve(format!(
2628                "failed to read topic head for {topic_name}: {error}"
2629            ))
2630        })
2631}
2632
2633async fn drain_pump_best_effort(
2634    log: &Arc<AnyEventLog>,
2635    topic_name: &str,
2636    pump: PumpHandle,
2637    config: DrainConfig,
2638    overall_deadline: tokio::time::Instant,
2639) -> Result<PumpDrainReport, OrchestratorError> {
2640    let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
2641    let consumer = pump_consumer_id(&topic)?;
2642    let start_seen = log
2643        .consumer_cursor(&topic, &consumer)
2644        .await
2645        .map_err(|error| format!("failed to read consumer cursor for {topic_name}: {error}"))?
2646        .unwrap_or(0);
2647    let up_to = log
2648        .latest(&topic)
2649        .await
2650        .map_err(|error| format!("failed to read topic head for {topic_name}: {error}"))?
2651        .unwrap_or(0);
2652    let budget = remaining_budget(overall_deadline);
2653
2654    match tokio::time::timeout(
2655        budget,
2656        pump.drain(log, topic_name, up_to, config, overall_deadline),
2657    )
2658    .await
2659    {
2660        Ok(Ok(report)) => Ok(report),
2661        Ok(Err(error)) => {
2662            eprintln!("[harn] warning: pump drain error for {topic_name}: {error}");
2663            best_effort_pump_report(
2664                log,
2665                &topic,
2666                &consumer,
2667                start_seen,
2668                up_to,
2669                PumpDrainStopReason::Error,
2670            )
2671            .await
2672        }
2673        Err(_) => {
2674            eprintln!(
2675                "[harn] warning: pump drain timed out for {topic_name} after {:?}",
2676                budget
2677            );
2678            best_effort_pump_report(
2679                log,
2680                &topic,
2681                &consumer,
2682                start_seen,
2683                up_to,
2684                PumpDrainStopReason::Deadline,
2685            )
2686            .await
2687        }
2688    }
2689}
2690
2691async fn best_effort_pump_report(
2692    log: &Arc<AnyEventLog>,
2693    topic: &harn_vm::event_log::Topic,
2694    consumer: &ConsumerId,
2695    start_seen: u64,
2696    up_to: u64,
2697    stop_reason: PumpDrainStopReason,
2698) -> Result<PumpDrainReport, OrchestratorError> {
2699    let last_seen = log
2700        .consumer_cursor(topic, consumer)
2701        .await
2702        .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
2703        .unwrap_or(start_seen);
2704    let stats = PumpStats {
2705        last_seen,
2706        processed: last_seen.saturating_sub(start_seen),
2707    };
2708    Ok(pump_drain_report(stats, start_seen, up_to, 0, stop_reason))
2709}
2710
2711fn remaining_budget(deadline: tokio::time::Instant) -> Duration {
2712    deadline.saturating_duration_since(tokio::time::Instant::now())
2713}
2714
2715fn maybe_finish_pump_drain(
2716    stats: PumpStats,
2717    progress: PumpDrainProgress,
2718    outstanding_tasks: usize,
2719) -> Option<PumpDrainReport> {
2720    if stats.last_seen >= progress.request.up_to && outstanding_tasks == 0 {
2721        return Some(pump_drain_report(
2722            stats,
2723            progress.start_seen,
2724            progress.request.up_to,
2725            outstanding_tasks,
2726            PumpDrainStopReason::Drained,
2727        ));
2728    }
2729    if outstanding_tasks > 0 {
2730        if tokio::time::Instant::now() >= progress.request.deadline {
2731            return Some(pump_drain_report(
2732                stats,
2733                progress.start_seen,
2734                progress.request.up_to,
2735                outstanding_tasks,
2736                PumpDrainStopReason::Deadline,
2737            ));
2738        }
2739        return None;
2740    }
2741    let drain_items = stats.last_seen.saturating_sub(progress.start_seen);
2742    if drain_items >= progress.request.config.max_items as u64 {
2743        return Some(pump_drain_report(
2744            stats,
2745            progress.start_seen,
2746            progress.request.up_to,
2747            outstanding_tasks,
2748            PumpDrainStopReason::MaxItems,
2749        ));
2750    }
2751    if tokio::time::Instant::now() >= progress.request.deadline {
2752        return Some(pump_drain_report(
2753            stats,
2754            progress.start_seen,
2755            progress.request.up_to,
2756            outstanding_tasks,
2757            PumpDrainStopReason::Deadline,
2758        ));
2759    }
2760    None
2761}
2762
2763fn pump_drain_report(
2764    stats: PumpStats,
2765    start_seen: u64,
2766    up_to: u64,
2767    outstanding_tasks: usize,
2768    stop_reason: PumpDrainStopReason,
2769) -> PumpDrainReport {
2770    PumpDrainReport {
2771        stats,
2772        drain_items: stats.last_seen.saturating_sub(start_seen),
2773        remaining_queued: up_to.saturating_sub(stats.last_seen),
2774        outstanding_tasks,
2775        stop_reason,
2776    }
2777}
2778
2779fn pump_consumer_id(topic: &harn_vm::event_log::Topic) -> Result<ConsumerId, OrchestratorError> {
2780    ConsumerId::new(format!("orchestrator-pump.{}", topic.as_str())).map_err(|error| {
2781        OrchestratorError::Serve(format!("failed to create consumer id for {topic}: {error}"))
2782    })
2783}
2784
2785fn inbox_task_test_release_file() -> Option<PathBuf> {
2786    test_file_from_env(TEST_INBOX_TASK_RELEASE_FILE_ENV)
2787}
2788
2789fn test_file_from_env(key: &str) -> Option<PathBuf> {
2790    std::env::var_os(key)
2791        .filter(|value| !value.is_empty())
2792        .map(PathBuf::from)
2793}
2794
2795async fn wait_for_test_release_file(path: &Path) {
2796    while tokio::fs::metadata(path).await.is_err() {
2797        tokio::time::sleep(Duration::from_millis(10)).await;
2798    }
2799}
2800
2801fn pending_pump_test_should_fail() -> bool {
2802    std::env::var(TEST_FAIL_PENDING_PUMP_ENV)
2803        .ok()
2804        .is_some_and(|value| value != "0")
2805}
2806
2807fn spawn_manifest_watcher(
2808    config_path: PathBuf,
2809    reload: AdminReloadHandle,
2810) -> Result<notify::RecommendedWatcher, OrchestratorError> {
2811    use notify::{Event, EventKind, RecursiveMode, Watcher};
2812
2813    let watch_dir = config_path.parent().ok_or_else(|| {
2814        format!(
2815            "manifest has no parent directory: {}",
2816            config_path.display()
2817        )
2818    })?;
2819    let target_name = config_path
2820        .file_name()
2821        .and_then(|name| name.to_str())
2822        .ok_or_else(|| {
2823            format!(
2824                "manifest path is not valid UTF-8: {}",
2825                config_path.display()
2826            )
2827        })?
2828        .to_string();
2829    let (tx, mut rx) = mpsc::unbounded_channel::<()>();
2830    tokio::task::spawn_local(async move {
2831        while rx.recv().await.is_some() {
2832            tokio::time::sleep(Duration::from_millis(200)).await;
2833            while rx.try_recv().is_ok() {}
2834            let _ = reload.trigger("file_watch");
2835        }
2836    });
2837    let mut watcher =
2838        notify::recommended_watcher(move |res: Result<Event, notify::Error>| match res {
2839            Ok(event)
2840                if matches!(
2841                    event.kind,
2842                    EventKind::Modify(_)
2843                        | EventKind::Create(_)
2844                        | EventKind::Remove(_)
2845                        | EventKind::Any
2846                ) && event.paths.iter().any(|path| {
2847                    path.file_name()
2848                        .and_then(|name| name.to_str())
2849                        .is_some_and(|name| name == target_name)
2850                }) =>
2851            {
2852                let _ = tx.send(());
2853            }
2854            _ => {}
2855        })
2856        .map_err(|error| format!("failed to create manifest watcher: {error}"))?;
2857    watcher
2858        .watch(watch_dir, RecursiveMode::NonRecursive)
2859        .map_err(|error| {
2860            format!(
2861                "failed to watch manifest directory {}: {error}",
2862                watch_dir.display()
2863            )
2864        })?;
2865    Ok(watcher)
2866}
2867
2868pub(crate) fn load_manifest(config_path: &Path) -> Result<(Manifest, PathBuf), OrchestratorError> {
2869    if !config_path.is_file() {
2870        return Err(format!("manifest not found: {}", config_path.display()).into());
2871    }
2872    let content = std::fs::read_to_string(config_path)
2873        .map_err(|error| format!("failed to read {}: {error}", config_path.display()))?;
2874    let manifest = toml::from_str::<Manifest>(&content)
2875        .map_err(|error| format!("failed to parse {}: {error}", config_path.display()))?;
2876    let manifest_dir = config_path.parent().map(Path::to_path_buf).ok_or_else(|| {
2877        format!(
2878            "manifest has no parent directory: {}",
2879            config_path.display()
2880        )
2881    })?;
2882    Ok((manifest, manifest_dir))
2883}
2884
2885pub(crate) fn absolutize_from_cwd(path: &Path) -> Result<PathBuf, OrchestratorError> {
2886    let candidate = if path.is_absolute() {
2887        path.to_path_buf()
2888    } else {
2889        std::env::current_dir()
2890            .map_err(|error| format!("failed to read current directory: {error}"))?
2891            .join(path)
2892    };
2893    Ok(candidate)
2894}
2895
2896fn configured_secret_chain_display() -> String {
2897    std::env::var(harn_vm::secrets::SECRET_PROVIDER_CHAIN_ENV)
2898        .unwrap_or_else(|_| harn_vm::secrets::DEFAULT_SECRET_PROVIDER_CHAIN.to_string())
2899        .split(',')
2900        .map(str::trim)
2901        .filter(|segment| !segment.is_empty())
2902        .collect::<Vec<_>>()
2903        .join(" -> ")
2904}
2905
2906fn secret_namespace_for(manifest_dir: &Path) -> String {
2907    match std::env::var("HARN_SECRET_NAMESPACE") {
2908        Ok(namespace) if !namespace.trim().is_empty() => namespace,
2909        _ => {
2910            let leaf = manifest_dir
2911                .file_name()
2912                .and_then(|name| name.to_str())
2913                .filter(|name| !name.is_empty())
2914                .unwrap_or("workspace");
2915            format!("harn/{leaf}")
2916        }
2917    }
2918}
2919
2920fn now_rfc3339() -> Result<String, OrchestratorError> {
2921    OffsetDateTime::now_utc()
2922        .format(&Rfc3339)
2923        .map_err(|error| OrchestratorError::Serve(format!("failed to format timestamp: {error}")))
2924}
2925
2926fn write_state_snapshot(
2927    path: &Path,
2928    snapshot: &ServeStateSnapshot,
2929) -> Result<(), OrchestratorError> {
2930    let encoded = serde_json::to_vec_pretty(snapshot)
2931        .map_err(|error| format!("failed to encode orchestrator state snapshot: {error}"))?;
2932    if let Some(parent) = path.parent() {
2933        std::fs::create_dir_all(parent)
2934            .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
2935    }
2936    std::fs::write(path, encoded).map_err(|error| {
2937        OrchestratorError::Serve(format!("failed to write {}: {error}", path.display()))
2938    })
2939}
2940
2941// ── Snapshot types ────────────────────────────────────────────────────────────
2942
2943#[derive(Debug, Serialize)]
2944struct ServeStateSnapshot {
2945    status: String,
2946    role: String,
2947    bind: String,
2948    listener_url: String,
2949    manifest_path: String,
2950    state_dir: String,
2951    started_at: String,
2952    stopped_at: Option<String>,
2953    secret_provider_chain: String,
2954    event_log_backend: String,
2955    event_log_location: Option<String>,
2956    triggers: Vec<TriggerStateSnapshot>,
2957    connectors: Vec<String>,
2958    activations: Vec<ConnectorActivationSnapshot>,
2959}
2960
2961#[derive(Debug, Serialize)]
2962struct TriggerStateSnapshot {
2963    id: String,
2964    provider: String,
2965    kind: String,
2966    handler: String,
2967    version: Option<u32>,
2968    state: Option<String>,
2969    received: u64,
2970    dispatched: u64,
2971    failed: u64,
2972    in_flight: u64,
2973}
2974
2975#[derive(Debug, Serialize)]
2976struct ConnectorActivationSnapshot {
2977    provider: String,
2978    binding_count: usize,
2979}
2980
2981// ── Placeholder connector ─────────────────────────────────────────────────────
2982
2983struct PlaceholderConnector {
2984    provider_id: harn_vm::ProviderId,
2985    kinds: Vec<harn_vm::TriggerKind>,
2986    _ctx: Option<harn_vm::ConnectorCtx>,
2987}
2988
2989impl PlaceholderConnector {
2990    fn new(provider_id: harn_vm::ProviderId, kinds: BTreeSet<String>) -> Self {
2991        Self {
2992            provider_id,
2993            kinds: kinds.into_iter().map(harn_vm::TriggerKind::from).collect(),
2994            _ctx: None,
2995        }
2996    }
2997}
2998
2999struct PlaceholderClient;
3000
3001#[async_trait]
3002impl harn_vm::ConnectorClient for PlaceholderClient {
3003    async fn call(
3004        &self,
3005        method: &str,
3006        _args: JsonValue,
3007    ) -> Result<JsonValue, harn_vm::ClientError> {
3008        Err(harn_vm::ClientError::Other(format!(
3009            "connector client method '{method}' is not implemented in the orchestrator scaffold"
3010        )))
3011    }
3012}
3013
3014#[async_trait]
3015impl harn_vm::Connector for PlaceholderConnector {
3016    fn provider_id(&self) -> &harn_vm::ProviderId {
3017        &self.provider_id
3018    }
3019
3020    fn kinds(&self) -> &[harn_vm::TriggerKind] {
3021        &self.kinds
3022    }
3023
3024    async fn init(&mut self, ctx: harn_vm::ConnectorCtx) -> Result<(), harn_vm::ConnectorError> {
3025        self._ctx = Some(ctx);
3026        Ok(())
3027    }
3028
3029    async fn activate(
3030        &self,
3031        bindings: &[harn_vm::TriggerBinding],
3032    ) -> Result<harn_vm::ActivationHandle, harn_vm::ConnectorError> {
3033        Ok(harn_vm::ActivationHandle::new(
3034            self.provider_id.clone(),
3035            bindings.len(),
3036        ))
3037    }
3038
3039    async fn normalize_inbound(
3040        &self,
3041        _raw: harn_vm::RawInbound,
3042    ) -> Result<harn_vm::TriggerEvent, harn_vm::ConnectorError> {
3043        Err(harn_vm::ConnectorError::Unsupported(format!(
3044            "connector '{}' inbound normalization is not implemented yet",
3045            self.provider_id.as_str()
3046        )))
3047    }
3048
3049    fn payload_schema(&self) -> harn_vm::ProviderPayloadSchema {
3050        harn_vm::ProviderPayloadSchema::named("TriggerEvent")
3051    }
3052
3053    fn client(&self) -> Arc<dyn harn_vm::ConnectorClient> {
3054        Arc::new(PlaceholderClient)
3055    }
3056}
3057
3058#[cfg(test)]
3059mod tests {
3060    use super::*;
3061    use futures::StreamExt;
3062    use harn_vm::event_log::{EventLog, Topic};
3063
3064    fn write_test_file(dir: &Path, relative: &str, contents: &str) {
3065        let path = dir.join(relative);
3066        if let Some(parent) = path.parent() {
3067            std::fs::create_dir_all(parent).unwrap();
3068        }
3069        std::fs::write(path, contents).unwrap();
3070    }
3071
3072    fn stream_manifest_fixture() -> &'static str {
3073        r#"
3074[package]
3075name = "fixture"
3076
3077[exports]
3078handlers = "lib.harn"
3079
3080[[triggers]]
3081id = "ws-stream"
3082kind = "stream"
3083provider = "websocket"
3084path = "/streams/ws"
3085match = { events = ["quote.tick"] }
3086handler = "handlers::on_stream"
3087"#
3088    }
3089
3090    fn stream_handler_fixture(marker_path: &Path) -> String {
3091        format!(
3092            r#"
3093import "std/triggers"
3094
3095pub fn on_stream(event: TriggerEvent) {{
3096  write_file({marker:?}, json_stringify({{
3097    provider: event.provider,
3098    kind: event.kind,
3099    key: event.provider_payload.key,
3100    stream: event.provider_payload.stream,
3101    amount: event.provider_payload.raw.value.amount,
3102  }}))
3103}}
3104"#,
3105            marker = marker_path.display().to_string()
3106        )
3107    }
3108
3109    /// Proof test: `stream_trigger_route_uses_generic_stream_connector`
3110    /// migrated from subprocess + SQLite polling to in-process harness +
3111    /// `EventLog::subscribe()`.
3112    #[tokio::test(flavor = "multi_thread")]
3113    async fn stream_trigger_route_uses_generic_stream_connector_in_process() {
3114        // Env vars are process-global; hold the lock for the entire test so
3115        // concurrent unit tests that also set env vars don't race.
3116        let _env_lock = crate::tests::common::env_lock::lock_env().lock().await;
3117        let _secret_providers = crate::env_guard::ScopedEnvVar::set("HARN_SECRET_PROVIDERS", "env");
3118
3119        let temp = tempfile::TempDir::new().unwrap();
3120        let marker_path = temp.path().join("stream-handler.json");
3121        write_test_file(temp.path(), "harn.toml", stream_manifest_fixture());
3122        write_test_file(
3123            temp.path(),
3124            "lib.harn",
3125            &stream_handler_fixture(&marker_path),
3126        );
3127
3128        let config =
3129            OrchestratorConfig::for_test(temp.path().join("harn.toml"), temp.path().join("state"));
3130        let harness = OrchestratorHarness::start(config)
3131            .await
3132            .expect("harness start");
3133        let base_url = harness.listener_url().to_string();
3134        let event_log = harness.event_log();
3135
3136        let response = reqwest::Client::new()
3137            .post(format!("{base_url}/streams/ws"))
3138            .header("content-type", "application/json")
3139            .json(&serde_json::json!({
3140                "key": "acct-1",
3141                "stream": "quotes",
3142                "value": {"amount": 10}
3143            }))
3144            .send()
3145            .await
3146            .unwrap();
3147        assert_eq!(response.status(), 200);
3148
3149        // Event-driven wait: subscribe to orchestrator.lifecycle and block
3150        // until pump_dispatch_completed arrives, replacing SQLite polling.
3151        let topic = Topic::new("orchestrator.lifecycle").unwrap();
3152        let mut stream = event_log.clone().subscribe(&topic, None).await.unwrap();
3153        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30);
3154        loop {
3155            let remaining = deadline
3156                .checked_duration_since(tokio::time::Instant::now())
3157                .expect("timed out waiting for pump_dispatch_completed");
3158            let (_, event) = tokio::time::timeout(remaining, stream.next())
3159                .await
3160                .expect("timed out waiting for pump_dispatch_completed event")
3161                .expect("event stream ended unexpectedly")
3162                .expect("event stream error");
3163            if event.kind == "pump_dispatch_completed"
3164                && event.payload["status"] == serde_json::json!("completed")
3165            {
3166                break;
3167            }
3168        }
3169        drop(stream);
3170
3171        let marker: serde_json::Value =
3172            serde_json::from_str(&std::fs::read_to_string(&marker_path).unwrap()).unwrap();
3173        assert_eq!(
3174            marker.get("provider").and_then(|v| v.as_str()),
3175            Some("websocket")
3176        );
3177        assert_eq!(
3178            marker.get("kind").and_then(|v| v.as_str()),
3179            Some("quote.tick")
3180        );
3181        assert_eq!(marker.get("key").and_then(|v| v.as_str()), Some("acct-1"));
3182        assert_eq!(
3183            marker.get("stream").and_then(|v| v.as_str()),
3184            Some("quotes")
3185        );
3186        assert_eq!(marker.get("amount").and_then(|v| v.as_i64()), Some(10));
3187
3188        harness
3189            .shutdown(std::time::Duration::from_secs(5))
3190            .await
3191            .expect("harness shutdown");
3192    }
3193}