Skip to main content

harn_cli/commands/orchestrator/
harness.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::net::SocketAddr;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use serde::{Deserialize, Serialize};
10use serde_json::{json, Value as JsonValue};
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use tokio::sync::{mpsc, oneshot, watch};
14use tokio::task::JoinSet;
15
16use harn_vm::event_log::{AnyEventLog, ConsumerId, EventLog};
17
18use super::common::stranded_envelopes;
19use super::errors::OrchestratorError;
20use super::listener::{
21    AdminReloadHandle, AdminReloadRequest, ListenerConfig, ListenerRuntime, RouteConfig,
22    TriggerMetricSnapshot,
23};
24use super::origin_guard::OriginAllowList;
25use super::role::OrchestratorRole;
26use super::supervisor_state::apply_supervisor_state;
27use super::tls::TlsFiles;
28use crate::package::{
29    self, CollectedManifestTrigger, CollectedTriggerHandler, Manifest,
30    ResolvedProviderConnectorConfig, ResolvedProviderConnectorKind, ResolvedTriggerConfig,
31};
32
33const LIFECYCLE_TOPIC: &str = "orchestrator.lifecycle";
34#[cfg_attr(not(unix), allow(dead_code))]
35const MANIFEST_TOPIC: &str = "orchestrator.manifest";
36const STATE_SNAPSHOT_FILE: &str = "orchestrator-state.json";
37const PENDING_TOPIC: &str = "orchestrator.triggers.pending";
38const CRON_TICK_TOPIC: &str = "connectors.cron.tick";
39const TEST_INBOX_TASK_RELEASE_FILE_ENV: &str = "HARN_TEST_ORCHESTRATOR_INBOX_TASK_RELEASE_FILE";
40const TEST_FAIL_PENDING_PUMP_ENV: &str = "HARN_TEST_ORCHESTRATOR_FAIL_PENDING_PUMP";
41const WAITPOINT_SERVICE_INTERVAL: Duration = Duration::from_millis(250);
42
43// ── Public config ─────────────────────────────────────────────────────────────
44
45/// Drain limits applied during graceful shutdown.
46#[derive(Clone, Copy, Debug, PartialEq, Eq)]
47pub struct DrainConfig {
48    pub max_items: usize,
49    pub deadline: Duration,
50}
51
52impl Default for DrainConfig {
53    fn default() -> Self {
54        Self {
55            max_items: crate::package::default_orchestrator_drain_max_items(),
56            deadline: Duration::from_secs(
57                crate::package::default_orchestrator_drain_deadline_seconds(),
58            ),
59        }
60    }
61}
62
63/// Topic-pump concurrency limit.
64#[derive(Clone, Copy, Debug, PartialEq, Eq)]
65pub struct PumpConfig {
66    pub max_outstanding: usize,
67}
68
69impl Default for PumpConfig {
70    fn default() -> Self {
71        Self {
72            max_outstanding: crate::package::default_orchestrator_pump_max_outstanding(),
73        }
74    }
75}
76
77/// Configuration for `OrchestratorHarness::start`.
78#[derive(Clone, Debug)]
79pub struct OrchestratorConfig {
80    pub manifest_path: PathBuf,
81    pub state_dir: PathBuf,
82    pub bind: SocketAddr,
83    pub role: OrchestratorRole,
84    pub watch_manifest: bool,
85    pub mcp: bool,
86    pub mcp_path: String,
87    pub mcp_sse_path: String,
88    pub mcp_messages_path: String,
89    pub tls: Option<TlsFiles>,
90    pub shutdown_timeout: Duration,
91    pub drain: DrainConfig,
92    pub pump: PumpConfig,
93    /// When `Some`, installs an observability tracing subscriber.  Tests
94    /// should leave this `None` to avoid conflicts with the test runtime.
95    pub log_format: Option<harn_vm::observability::otel::LogFormat>,
96}
97
98impl OrchestratorConfig {
99    /// Minimal defaults suitable for integration tests.
100    pub fn for_test(manifest_path: PathBuf, state_dir: PathBuf) -> Self {
101        Self {
102            manifest_path,
103            state_dir,
104            bind: "127.0.0.1:0".parse().unwrap(),
105            role: OrchestratorRole::SingleTenant,
106            watch_manifest: false,
107            mcp: false,
108            mcp_path: "/mcp".to_string(),
109            mcp_sse_path: "/sse".to_string(),
110            mcp_messages_path: "/messages".to_string(),
111            tls: None,
112            shutdown_timeout: Duration::from_secs(5),
113            drain: DrainConfig::default(),
114            pump: PumpConfig::default(),
115            log_format: None,
116        }
117    }
118}
119
120// ── Public harness ────────────────────────────────────────────────────────────
121
122/// Summary returned by `OrchestratorHarness::shutdown`.
123#[derive(Debug)]
124pub struct ShutdownReport {
125    #[allow(dead_code)]
126    pub timed_out: bool,
127}
128
129/// Error type for harness operations.
130#[derive(Debug)]
131pub struct HarnessError(pub String);
132
133impl std::fmt::Display for HarnessError {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        f.write_str(&self.0)
136    }
137}
138
139impl From<OrchestratorError> for HarnessError {
140    fn from(error: OrchestratorError) -> Self {
141        HarnessError(error.to_string())
142    }
143}
144
145impl From<String> for HarnessError {
146    fn from(s: String) -> Self {
147        HarnessError(s)
148    }
149}
150
151/// In-process orchestrator runtime.
152///
153/// Starts all pumps, connectors, and the HTTP listener in a dedicated
154/// background thread with its own single-threaded Tokio runtime + LocalSet.
155/// Tests hold `Arc<AnyEventLog>` directly and use `EventLog::subscribe()` for
156/// event-driven waits instead of polling.
157#[allow(dead_code)]
158pub struct OrchestratorHarness {
159    event_log: Arc<AnyEventLog>,
160    listener_url: String,
161    local_addr: SocketAddr,
162    state_dir: PathBuf,
163    admin_reload: AdminReloadHandle,
164    shutdown_tx: Arc<watch::Sender<bool>>,
165    pump_drain_gate: PumpDrainGate,
166    join: Option<std::thread::JoinHandle<()>>,
167}
168
169struct ReadyState {
170    event_log: Arc<AnyEventLog>,
171    listener_url: String,
172    local_addr: SocketAddr,
173    state_dir: PathBuf,
174    admin_reload: AdminReloadHandle,
175}
176
177#[derive(Clone)]
178struct PumpDrainGate {
179    hold_tx: watch::Sender<bool>,
180}
181
182impl PumpDrainGate {
183    fn new() -> Self {
184        let (hold_tx, _) = watch::channel(false);
185        Self { hold_tx }
186    }
187
188    fn pause(&self) {
189        let _ = self.hold_tx.send(true);
190    }
191
192    fn release(&self) {
193        let _ = self.hold_tx.send(false);
194    }
195
196    fn subscribe(&self) -> watch::Receiver<bool> {
197        self.hold_tx.subscribe()
198    }
199}
200
201#[allow(dead_code)]
202impl OrchestratorHarness {
203    /// Start the orchestrator in-process.  Resolves once the HTTP listener
204    /// is ready and the startup lifecycle event has been appended.
205    pub async fn start(config: OrchestratorConfig) -> Result<Self, HarnessError> {
206        let (ready_tx, ready_rx) = oneshot::channel::<Result<ReadyState, OrchestratorError>>();
207        let (shutdown_tx, shutdown_rx) = watch::channel(false);
208        let shutdown_tx = Arc::new(shutdown_tx);
209        let pump_drain_gate = PumpDrainGate::new();
210        let task_pump_drain_gate = pump_drain_gate.clone();
211
212        let join = std::thread::spawn(move || {
213            // Use a multi-thread runtime so that blocking I/O (e.g. the OTEL
214            // SimpleSpanProcessor calling futures::executor::block_on for each
215            // span) can proceed on reactor threads while the LocalSet thread is
216            // temporarily paused.  A current-thread runtime would deadlock:
217            // block_on holds the only thread while the reactor needs that same
218            // thread to drive the TCP export.
219            let rt = tokio::runtime::Builder::new_multi_thread()
220                .worker_threads(2)
221                .enable_all()
222                .build()
223                .expect("failed to build OrchestratorHarness tokio runtime");
224            let local = tokio::task::LocalSet::new();
225            rt.block_on(local.run_until(orchestrator_task(
226                config,
227                ready_tx,
228                shutdown_rx,
229                task_pump_drain_gate,
230            )));
231        });
232
233        match ready_rx.await {
234            Ok(Ok(ready)) => Ok(Self {
235                event_log: ready.event_log,
236                listener_url: ready.listener_url,
237                local_addr: ready.local_addr,
238                state_dir: ready.state_dir,
239                admin_reload: ready.admin_reload,
240                shutdown_tx,
241                pump_drain_gate,
242                join: Some(join),
243            }),
244            Ok(Err(error)) => {
245                let _ = join.join();
246                Err(HarnessError::from(error))
247            }
248            Err(_) => {
249                let _ = join.join();
250                Err(HarnessError(
251                    "harness thread exited before signaling readiness".to_string(),
252                ))
253            }
254        }
255    }
256
257    pub fn listener_url(&self) -> &str {
258        &self.listener_url
259    }
260
261    pub fn local_addr(&self) -> SocketAddr {
262        self.local_addr
263    }
264
265    pub fn event_log(&self) -> Arc<AnyEventLog> {
266        self.event_log.clone()
267    }
268
269    pub fn state_dir(&self) -> &Path {
270        &self.state_dir
271    }
272
273    /// Returns a handle that can be used to trigger admin-reload from outside
274    /// the harness (e.g. on SIGHUP in the CLI wrapper).
275    pub fn admin_reload(&self) -> AdminReloadHandle {
276        self.admin_reload.clone()
277    }
278
279    /// Returns a sender that, when `send(true)` is called, triggers graceful
280    /// shutdown.  Use this to wire OS signal handlers in the CLI wrapper.
281    pub fn shutdown_trigger(&self) -> Arc<watch::Sender<bool>> {
282        self.shutdown_tx.clone()
283    }
284
285    /// Pause topic pumps at the next event admission. Tests can wait for
286    /// `pump_drain_waiting` before triggering shutdown.
287    pub fn pause_pump_drain(&self) {
288        self.pump_drain_gate.pause();
289    }
290
291    /// Release topic pumps paused by `pause_pump_drain`.
292    pub fn release_pump_drain(&self) {
293        self.pump_drain_gate.release();
294    }
295
296    /// Cancel-safe, idempotent.  Performs the same drain logic as SIGTERM.
297    pub async fn shutdown(mut self, _deadline: Duration) -> Result<ShutdownReport, HarnessError> {
298        // Signal the background runtime to start graceful shutdown.
299        let _ = self.shutdown_tx.send(true);
300        // Wait for the background thread to finish (graceful shutdown runs there).
301        let join = self.join.take().expect("join handle");
302        tokio::task::spawn_blocking(move || join.join())
303            .await
304            .map_err(|_| HarnessError("spawn_blocking join failed".to_string()))?
305            .map_err(|_| HarnessError("harness background thread panicked".to_string()))?;
306        Ok(ShutdownReport { timed_out: false })
307    }
308}
309
310impl Drop for OrchestratorHarness {
311    fn drop(&mut self) {
312        let _ = self.shutdown_tx.send(true);
313        if let Some(join) = self.join.take() {
314            let _ = join.join();
315        }
316    }
317}
318
319// ── Internal lifecycle ────────────────────────────────────────────────────────
320
321async fn orchestrator_task(
322    config: OrchestratorConfig,
323    ready_tx: oneshot::Sender<Result<ReadyState, OrchestratorError>>,
324    shutdown_rx: watch::Receiver<bool>,
325    pump_drain_gate: PumpDrainGate,
326) {
327    if let Err(error) = orchestrator_lifecycle(config, ready_tx, shutdown_rx, pump_drain_gate).await
328    {
329        eprintln!("[harn] orchestrator harness error: {error}");
330    }
331}
332
333async fn orchestrator_lifecycle(
334    config: OrchestratorConfig,
335    ready_tx: oneshot::Sender<Result<ReadyState, OrchestratorError>>,
336    mut shutdown_rx: watch::Receiver<bool>,
337    pump_drain_gate: PumpDrainGate,
338) -> Result<(), OrchestratorError> {
339    harn_vm::reset_thread_local_state();
340
341    let shutdown_timeout = config.shutdown_timeout;
342    let drain_config = config.drain;
343    let pump_config = PumpConfig {
344        max_outstanding: config.pump.max_outstanding.max(1),
345    };
346
347    let state_dir = config.state_dir.clone();
348    std::fs::create_dir_all(&state_dir).map_err(|error| {
349        format!(
350            "failed to create state dir {}: {error}",
351            state_dir.display()
352        )
353    })?;
354
355    let observability = if let Some(log_format) = config.log_format {
356        Some(
357            harn_vm::observability::otel::ObservabilityGuard::install_orchestrator_subscriber(
358                harn_vm::observability::otel::OrchestratorObservabilityConfig {
359                    log_format,
360                    state_dir: Some(state_dir.clone()),
361                },
362            )?,
363        )
364    } else {
365        None
366    };
367
368    let config_path = absolutize_from_cwd(&config.manifest_path)?;
369    let (manifest, manifest_dir) = load_manifest(&config_path)?;
370    let drain_config = DrainConfig {
371        max_items: drain_config.max_items.max(1),
372        deadline: drain_config.deadline,
373    };
374    let pump_config = PumpConfig {
375        max_outstanding: pump_config.max_outstanding.max(1),
376    };
377
378    let startup_started_at = now_rfc3339()?;
379    let (admin_reload, mut reload_rx) = AdminReloadHandle::channel();
380
381    eprintln!("[harn] orchestrator manifest: {}", config_path.display());
382    if let Some(name) = manifest
383        .package
384        .as_ref()
385        .and_then(|package| package.name.as_deref())
386    {
387        eprintln!("[harn] orchestrator package: {name}");
388    }
389    eprintln!(
390        "[harn] orchestrator role: {} ({})",
391        config.role.as_str(),
392        config.role.registry_mode()
393    );
394    eprintln!("[harn] orchestrator state dir: {}", state_dir.display());
395    tracing::info!(
396        component = "orchestrator",
397        trace_id = "",
398        role = config.role.as_str(),
399        state_dir = %state_dir.display(),
400        manifest = %config_path.display(),
401        "orchestrator starting"
402    );
403
404    let workspace_root = manifest_dir.clone();
405    let mut vm = config
406        .role
407        .build_vm(&workspace_root, &manifest_dir, &state_dir)?;
408
409    let event_log = harn_vm::event_log::active_event_log()
410        .ok_or_else(|| "event log was not installed during VM initialization".to_string())?;
411    let event_log_description = event_log.describe();
412    let tenant_store = if config.role == OrchestratorRole::MultiTenant {
413        let store = harn_vm::TenantStore::load(&state_dir)?;
414        let active_tenants = store
415            .list()
416            .into_iter()
417            .filter(|tenant| tenant.status == harn_vm::TenantStatus::Active)
418            .collect::<Vec<_>>();
419        eprintln!(
420            "[harn] tenants loaded: {} active ({})",
421            active_tenants.len(),
422            active_tenants
423                .iter()
424                .map(|tenant| tenant.scope.id.0.as_str())
425                .collect::<Vec<_>>()
426                .join(", ")
427        );
428        Some(Arc::new(store))
429    } else {
430        None
431    };
432    eprintln!(
433        "[harn] event log: {} {}",
434        event_log_description.backend,
435        event_log_description
436            .location
437            .as_ref()
438            .map(|path| path.display().to_string())
439            .unwrap_or_else(|| "<memory>".to_string())
440    );
441
442    let secret_namespace = secret_namespace_for(&manifest_dir);
443    let secret_chain_display = configured_secret_chain_display();
444    let secret_chain = harn_vm::secrets::configured_default_chain(secret_namespace.clone())
445        .map_err(|error| format!("failed to configure secret providers: {error}"))?;
446    if secret_chain.providers().is_empty() {
447        return Err("secret provider chain resolved to zero providers"
448            .to_string()
449            .into());
450    }
451    eprintln!(
452        "[harn] secret providers: {} (namespace {})",
453        secret_chain_display, secret_namespace
454    );
455    let secret_provider: Arc<dyn harn_vm::secrets::SecretProvider> = Arc::new(secret_chain);
456
457    let extensions = package::load_runtime_extensions(&config_path);
458    let metrics_registry = Arc::new(harn_vm::MetricsRegistry::default());
459    harn_vm::install_active_metrics_registry(metrics_registry.clone());
460    let collected_triggers = package::collect_manifest_triggers(&mut vm, &extensions)
461        .await
462        .map_err(|error| format!("failed to collect manifest triggers: {error}"))?;
463    package::install_collected_manifest_triggers(&collected_triggers).await?;
464    apply_supervisor_state(&state_dir).await?;
465    eprintln!(
466        "[harn] registered triggers ({}): {}",
467        collected_triggers.len(),
468        format_trigger_summary(&collected_triggers)
469    );
470
471    let binding_versions = live_manifest_binding_versions();
472    let route_configs = build_route_configs(&collected_triggers, &binding_versions)?;
473    let mut connector_runtime = initialize_connectors(
474        &collected_triggers,
475        event_log.clone(),
476        secret_provider.clone(),
477        metrics_registry.clone(),
478        &extensions.provider_connectors,
479    )
480    .await?;
481    let route_configs = attach_route_connectors(
482        route_configs,
483        &connector_runtime.registry,
484        &extensions.provider_connectors,
485    )?;
486    let connector_clients = connector_runtime.registry.client_map().await;
487    harn_vm::install_active_connector_clients(connector_clients);
488    eprintln!(
489        "[harn] registered connectors ({}): {}",
490        connector_runtime.providers.len(),
491        connector_runtime.providers.join(", ")
492    );
493    eprintln!(
494        "[harn] activated connectors: {}",
495        format_activation_summary(&connector_runtime.activations)
496    );
497    let (mcp_router, mcp_service) = if config.mcp {
498        validate_mcp_paths(
499            &config.mcp_path,
500            &config.mcp_sse_path,
501            &config.mcp_messages_path,
502        )?;
503        if !has_orchestrator_api_keys_configured() && !has_mcp_oauth_configured() {
504            return Err(OrchestratorError::Serve(
505                "--mcp requires HARN_ORCHESTRATOR_API_KEYS or HARN_MCP_OAUTH_AUTHORIZATION_SERVERS so the embedded MCP management surface is authenticated"
506                    .to_string(),
507            ));
508        }
509        let service = Arc::new(
510            crate::commands::mcp::serve::McpOrchestratorService::new_local(
511                crate::cli::OrchestratorLocalArgs {
512                    config: config_path.clone(),
513                    state_dir: state_dir.clone(),
514                },
515            )?,
516        );
517        let router = crate::commands::mcp::serve::http_router_for_service(
518            service.clone(),
519            config.mcp_path.clone(),
520            config.mcp_sse_path.clone(),
521            config.mcp_messages_path.clone(),
522        );
523        eprintln!(
524            "[harn] embedded MCP server mounted at {} (legacy SSE {}, messages {})",
525            config.mcp_path, config.mcp_sse_path, config.mcp_messages_path
526        );
527        (Some(router), Some(service))
528    } else {
529        (None, None)
530    };
531
532    let dispatcher = harn_vm::Dispatcher::with_event_log_and_metrics(
533        vm,
534        event_log.clone(),
535        Some(metrics_registry.clone()),
536    );
537    let mut pending_pumps = vec![(
538        PENDING_TOPIC.to_string(),
539        spawn_pending_pump(
540            event_log.clone(),
541            dispatcher.clone(),
542            pump_config,
543            metrics_registry.clone(),
544            pump_drain_gate.clone(),
545            PENDING_TOPIC,
546        )?,
547    )];
548    let mut inbox_pumps = vec![(
549        harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC.to_string(),
550        spawn_inbox_pump(
551            event_log.clone(),
552            dispatcher.clone(),
553            pump_config,
554            metrics_registry.clone(),
555            harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC,
556        )?,
557    )];
558    if let Some(store) = tenant_store.as_ref() {
559        for tenant in store
560            .list()
561            .into_iter()
562            .filter(|tenant| tenant.status == harn_vm::TenantStatus::Active)
563        {
564            let pending_topic = harn_vm::tenant_topic(
565                &tenant.scope.id,
566                &harn_vm::event_log::Topic::new(PENDING_TOPIC)
567                    .map_err(|error| error.to_string())?,
568            )
569            .map_err(|error| error.to_string())?;
570            pending_pumps.push((
571                pending_topic.as_str().to_string(),
572                spawn_pending_pump(
573                    event_log.clone(),
574                    dispatcher.clone(),
575                    pump_config,
576                    metrics_registry.clone(),
577                    pump_drain_gate.clone(),
578                    pending_topic.as_str(),
579                )?,
580            ));
581            let inbox_topic = harn_vm::tenant_topic(
582                &tenant.scope.id,
583                &harn_vm::event_log::Topic::new(harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC)
584                    .map_err(|error| error.to_string())?,
585            )
586            .map_err(|error| error.to_string())?;
587            inbox_pumps.push((
588                inbox_topic.as_str().to_string(),
589                spawn_inbox_pump(
590                    event_log.clone(),
591                    dispatcher.clone(),
592                    pump_config,
593                    metrics_registry.clone(),
594                    inbox_topic.as_str(),
595                )?,
596            ));
597        }
598    }
599    let cron_pump = spawn_cron_pump(
600        event_log.clone(),
601        dispatcher.clone(),
602        pump_config,
603        metrics_registry.clone(),
604        pump_drain_gate.clone(),
605    )?;
606    let waitpoint_pump = spawn_waitpoint_resume_pump(
607        event_log.clone(),
608        dispatcher.clone(),
609        pump_config,
610        metrics_registry.clone(),
611        pump_drain_gate.clone(),
612    )?;
613    let waitpoint_cancel_pump = spawn_waitpoint_cancel_pump(
614        event_log.clone(),
615        dispatcher.clone(),
616        pump_config,
617        metrics_registry.clone(),
618        pump_drain_gate.clone(),
619    )?;
620    let waitpoint_sweeper = spawn_waitpoint_sweeper(dispatcher.clone());
621
622    let listener = ListenerRuntime::start(ListenerConfig {
623        bind: config.bind,
624        tls: config.tls.clone(),
625        event_log: event_log.clone(),
626        secrets: secret_provider.clone(),
627        allowed_origins: OriginAllowList::from_manifest(&manifest.orchestrator.allowed_origins),
628        max_body_bytes: ListenerConfig::max_body_bytes_or_default(
629            manifest.orchestrator.max_body_bytes,
630        ),
631        metrics_registry: metrics_registry.clone(),
632        admin_reload: Some(admin_reload.clone()),
633        mcp_router,
634        routes: route_configs,
635        tenant_store: tenant_store.clone(),
636        session_store: Some(Arc::new(harn_vm::SessionStore::new(event_log.clone()))),
637    })
638    .await?;
639    let local_bind = listener.local_addr();
640    let listener_metrics = listener.trigger_metrics();
641    let mut live_manifest = manifest;
642    let mut live_triggers = collected_triggers;
643    let _manifest_watcher = if config.watch_manifest {
644        Some(spawn_manifest_watcher(
645            config_path.clone(),
646            admin_reload.clone(),
647        )?)
648    } else {
649        None
650    };
651    connector_runtime.activations = connector_runtime
652        .registry
653        .activate_all(&connector_runtime.trigger_registry)
654        .await
655        .map_err(|error| error.to_string())?;
656    eprintln!(
657        "[harn] activated connectors: {}",
658        format_activation_summary(&connector_runtime.activations)
659    );
660
661    listener.mark_ready();
662    eprintln!("[harn] HTTP listener ready on {}", listener.url());
663    tracing::info!(
664        component = "orchestrator",
665        trace_id = "",
666        listener_url = %listener.url(),
667        "HTTP listener ready"
668    );
669
670    write_state_snapshot(
671        &state_dir.join(STATE_SNAPSHOT_FILE),
672        &ServeStateSnapshot {
673            status: "running".to_string(),
674            role: config.role.as_str().to_string(),
675            bind: local_bind.to_string(),
676            listener_url: listener.url(),
677            manifest_path: config_path.display().to_string(),
678            state_dir: state_dir.display().to_string(),
679            started_at: startup_started_at.clone(),
680            stopped_at: None,
681            secret_provider_chain: secret_chain_display.clone(),
682            event_log_backend: event_log_description.backend.to_string(),
683            event_log_location: event_log_description
684                .location
685                .as_ref()
686                .map(|path| path.display().to_string()),
687            triggers: trigger_state_snapshots(&live_triggers, &listener_metrics),
688            connectors: connector_runtime.providers.clone(),
689            activations: connector_runtime
690                .activations
691                .iter()
692                .map(|activation| ConnectorActivationSnapshot {
693                    provider: activation.provider.as_str().to_string(),
694                    binding_count: activation.binding_count,
695                })
696                .collect(),
697        },
698    )?;
699
700    append_lifecycle_event(
701        &event_log,
702        "startup",
703        json!({
704            "bind": local_bind.to_string(),
705            "manifest": config_path.display().to_string(),
706            "role": config.role.as_str(),
707            "state_dir": state_dir.display().to_string(),
708            "trigger_count": live_triggers.len(),
709            "connector_count": connector_runtime.providers.len(),
710            "tls_enabled": listener.scheme() == "https",
711            "shutdown_timeout_secs": shutdown_timeout.as_secs(),
712            "drain_max_items": drain_config.max_items,
713            "drain_deadline_secs": drain_config.deadline.as_secs(),
714            "pump_max_outstanding": pump_config.max_outstanding,
715        }),
716    )
717    .await?;
718
719    let stranded = stranded_envelopes(&event_log, Duration::ZERO).await?;
720    if !stranded.is_empty() {
721        eprintln!(
722            "[harn] startup found {} stranded inbox envelope(s); inspect with `harn orchestrator queue` and recover explicitly with `harn orchestrator recover --dry-run --envelope-age ...`",
723            stranded.len()
724        );
725    }
726    append_lifecycle_event(
727        &event_log,
728        "startup_stranded_envelopes",
729        json!({
730            "count": stranded.len(),
731        }),
732    )
733    .await?;
734
735    // Signal that the harness is ready.
736    let _ = ready_tx.send(Ok(ReadyState {
737        event_log: event_log.clone(),
738        listener_url: listener.url(),
739        local_addr: local_bind,
740        state_dir: state_dir.clone(),
741        admin_reload: admin_reload.clone(),
742    }));
743
744    // Wait for shutdown trigger or admin reload requests.
745    let mut ctx = RuntimeCtx {
746        role: config.role,
747        config_path: &config_path,
748        state_dir: &state_dir,
749        bind: local_bind,
750        startup_started_at: &startup_started_at,
751        event_log: &event_log,
752        event_log_description: &event_log_description,
753        secret_chain_display: &secret_chain_display,
754        listener: &listener,
755        connectors: &mut connector_runtime,
756        live_manifest: &mut live_manifest,
757        live_triggers: &mut live_triggers,
758        secret_provider: &secret_provider,
759        metrics_registry: &metrics_registry,
760        mcp_service: mcp_service.as_ref(),
761        reload_rx: &mut reload_rx,
762    };
763
764    loop {
765        tokio::select! {
766            changed = shutdown_rx.changed() => {
767                if changed.is_err() || *shutdown_rx.borrow() {
768                    break;
769                }
770            }
771            Some(request) = ctx.reload_rx.recv() => {
772                handle_reload_request(&mut ctx, request).await?;
773            }
774        }
775    }
776
777    listener.mark_not_ready();
778    let shutdown = graceful_shutdown(
779        GracefulShutdownCtx {
780            role: config.role,
781            bind: local_bind,
782            listener_url: listener.url(),
783            config_path: &config_path,
784            state_dir: &state_dir,
785            startup_started_at: &startup_started_at,
786            event_log: &event_log,
787            event_log_description: &event_log_description,
788            secret_chain_display: &secret_chain_display,
789            triggers: &live_triggers,
790            connectors: &connector_runtime,
791            shutdown_timeout,
792            drain_config,
793        },
794        listener,
795        dispatcher,
796        pending_pumps,
797        cron_pump,
798        inbox_pumps,
799        waitpoint_pump,
800        waitpoint_cancel_pump,
801        waitpoint_sweeper,
802    )
803    .await;
804
805    if let Some(obs) = observability {
806        if let Err(error) = obs.shutdown() {
807            if shutdown.is_ok() {
808                return Err(OrchestratorError::Serve(error));
809            }
810            eprintln!("[harn] observability shutdown warning: {error}");
811        }
812    }
813    harn_vm::clear_active_metrics_registry();
814    shutdown
815}
816
817// ── Internal types ────────────────────────────────────────────────────────────
818
819struct ConnectorRuntime {
820    registry: harn_vm::ConnectorRegistry,
821    trigger_registry: harn_vm::TriggerRegistry,
822    handles: Vec<harn_vm::connectors::ConnectorHandle>,
823    providers: Vec<String>,
824    activations: Vec<harn_vm::ActivationHandle>,
825    #[cfg_attr(not(unix), allow(dead_code))]
826    provider_overrides: Vec<ResolvedProviderConnectorConfig>,
827}
828
829#[cfg_attr(not(unix), allow(dead_code))]
830#[derive(Clone, Debug, Default, Serialize)]
831struct ManifestReloadSummary {
832    added: Vec<String>,
833    modified: Vec<String>,
834    removed: Vec<String>,
835    unchanged: Vec<String>,
836}
837
838#[derive(Clone, Copy, Debug, PartialEq, Eq)]
839enum PumpMode {
840    Running,
841    Draining(PumpDrainRequest),
842}
843
844#[derive(Clone, Copy, Debug, PartialEq, Eq)]
845struct PumpDrainRequest {
846    up_to: u64,
847    config: DrainConfig,
848    deadline: tokio::time::Instant,
849}
850
851#[derive(Clone, Copy, Debug, PartialEq, Eq)]
852enum PumpDrainStopReason {
853    Drained,
854    MaxItems,
855    Deadline,
856    Error,
857}
858
859impl PumpDrainStopReason {
860    fn as_str(self) -> &'static str {
861        match self {
862            Self::Drained => "drained",
863            Self::MaxItems => "max_items",
864            Self::Deadline => "deadline",
865            Self::Error => "error",
866        }
867    }
868}
869
870#[derive(Clone, Copy, Debug, Default)]
871struct PumpStats {
872    last_seen: u64,
873    processed: u64,
874}
875
876#[derive(Clone, Copy, Debug)]
877struct PumpDrainProgress {
878    request: PumpDrainRequest,
879    start_seen: u64,
880}
881
882#[derive(Clone, Copy, Debug)]
883struct PumpDrainReport {
884    stats: PumpStats,
885    drain_items: u64,
886    remaining_queued: u64,
887    outstanding_tasks: usize,
888    stop_reason: PumpDrainStopReason,
889}
890
891impl PumpDrainReport {
892    fn truncated(self) -> bool {
893        self.remaining_queued > 0 || self.outstanding_tasks > 0
894    }
895}
896
897struct PumpHandle {
898    mode_tx: watch::Sender<PumpMode>,
899    join: tokio::task::JoinHandle<Result<PumpDrainReport, OrchestratorError>>,
900}
901
902impl PumpHandle {
903    async fn drain(
904        self,
905        log: &Arc<AnyEventLog>,
906        topic_name: &str,
907        up_to: u64,
908        config: DrainConfig,
909        overall_deadline: tokio::time::Instant,
910    ) -> Result<PumpDrainReport, OrchestratorError> {
911        let drain_deadline = std::cmp::min(
912            tokio::time::Instant::now() + config.deadline,
913            overall_deadline,
914        );
915        let _ = self.mode_tx.send(PumpMode::Draining(PumpDrainRequest {
916            up_to,
917            config,
918            deadline: drain_deadline,
919        }));
920        append_pump_lifecycle_event(
921            log,
922            "pump_drain_started",
923            json!({
924                "topic": topic_name,
925                "up_to": up_to,
926                "max_items": config.max_items,
927                "drain_deadline_ms": config.deadline.as_millis(),
928            }),
929        )
930        .await?;
931        match self.join.await {
932            Ok(result) => result,
933            Err(error) => Err(format!("pump task join failed: {error}").into()),
934        }
935    }
936}
937
938struct WaitpointSweepHandle {
939    stop_tx: watch::Sender<bool>,
940    join: tokio::task::JoinHandle<Result<(), OrchestratorError>>,
941}
942
943impl WaitpointSweepHandle {
944    async fn shutdown(self) -> Result<(), OrchestratorError> {
945        let _ = self.stop_tx.send(true);
946        match self.join.await {
947            Ok(result) => result,
948            Err(error) => Err(format!("waitpoint sweeper join failed: {error}").into()),
949        }
950    }
951}
952
953#[derive(Debug, Deserialize)]
954struct PendingTriggerRecord {
955    trigger_id: String,
956    binding_version: u32,
957    event: harn_vm::TriggerEvent,
958}
959
960// ── Pump spawn functions ──────────────────────────────────────────────────────
961
962fn spawn_pending_pump(
963    event_log: Arc<harn_vm::event_log::AnyEventLog>,
964    dispatcher: harn_vm::Dispatcher,
965    pump_config: PumpConfig,
966    metrics_registry: Arc<harn_vm::MetricsRegistry>,
967    pump_drain_gate: PumpDrainGate,
968    topic_name: &str,
969) -> Result<PumpHandle, OrchestratorError> {
970    let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
971    spawn_topic_pump(
972        event_log,
973        topic,
974        pump_config,
975        metrics_registry,
976        pump_drain_gate,
977        move |logged| {
978            let dispatcher = dispatcher.clone();
979            async move {
980                if pending_pump_test_should_fail() {
981                    return Err("test pending pump failure".to_string().into());
982                }
983                if logged.kind != "trigger_event" {
984                    return Ok(false);
985                }
986                let record: PendingTriggerRecord = serde_json::from_value(logged.payload)
987                    .map_err(|error| format!("failed to decode pending trigger event: {error}"))?;
988                dispatcher
989                    .enqueue_targeted_with_headers(
990                        Some(record.trigger_id),
991                        Some(record.binding_version),
992                        record.event,
993                        Some(&logged.headers),
994                    )
995                    .await
996                    .map_err(|error| format!("failed to enqueue pending trigger event: {error}"))?;
997                Ok(true)
998            }
999        },
1000    )
1001}
1002
1003fn spawn_cron_pump(
1004    event_log: Arc<harn_vm::event_log::AnyEventLog>,
1005    dispatcher: harn_vm::Dispatcher,
1006    pump_config: PumpConfig,
1007    metrics_registry: Arc<harn_vm::MetricsRegistry>,
1008    pump_drain_gate: PumpDrainGate,
1009) -> Result<PumpHandle, OrchestratorError> {
1010    let topic =
1011        harn_vm::event_log::Topic::new(CRON_TICK_TOPIC).map_err(|error| error.to_string())?;
1012    spawn_topic_pump(
1013        event_log,
1014        topic,
1015        pump_config,
1016        metrics_registry,
1017        pump_drain_gate,
1018        move |logged| {
1019            let dispatcher = dispatcher.clone();
1020            async move {
1021                if logged.kind != "trigger_event" {
1022                    return Ok(false);
1023                }
1024                let event: harn_vm::TriggerEvent = serde_json::from_value(logged.payload)
1025                    .map_err(|error| format!("failed to decode cron trigger event: {error}"))?;
1026                let trigger_id = match &event.provider_payload {
1027                    harn_vm::ProviderPayload::Known(
1028                        harn_vm::triggers::event::KnownProviderPayload::Cron(payload),
1029                    ) => payload.cron_id.clone(),
1030                    _ => None,
1031                };
1032                dispatcher
1033                    .enqueue_targeted_with_headers(trigger_id, None, event, Some(&logged.headers))
1034                    .await
1035                    .map_err(|error| format!("failed to enqueue cron trigger event: {error}"))?;
1036                Ok(true)
1037            }
1038        },
1039    )
1040}
1041
1042fn spawn_inbox_pump(
1043    event_log: Arc<harn_vm::event_log::AnyEventLog>,
1044    dispatcher: harn_vm::Dispatcher,
1045    pump_config: PumpConfig,
1046    metrics_registry: Arc<harn_vm::MetricsRegistry>,
1047    topic_name: &str,
1048) -> Result<PumpHandle, OrchestratorError> {
1049    let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
1050    let consumer = pump_consumer_id(&topic)?;
1051    let inbox_task_release_file = inbox_task_test_release_file();
1052    let (mode_tx, mut mode_rx) = watch::channel(PumpMode::Running);
1053    let join = tokio::task::spawn_local(async move {
1054        let start_from = event_log
1055            .consumer_cursor(&topic, &consumer)
1056            .await
1057            .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
1058            .or(event_log
1059                .latest(&topic)
1060                .await
1061                .map_err(|error| format!("failed to read topic head {topic}: {error}"))?);
1062        let mut stream = event_log
1063            .clone()
1064            .subscribe(&topic, start_from)
1065            .await
1066            .map_err(|error| format!("failed to subscribe topic {topic}: {error}"))?;
1067        let mut stats = PumpStats {
1068            last_seen: start_from.unwrap_or(0),
1069            processed: 0,
1070        };
1071        record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1072        let mut drain_progress = None;
1073        let mut tasks = JoinSet::new();
1074
1075        loop {
1076            if let Some(progress) = drain_progress {
1077                if let Some(report) = maybe_finish_pump_drain(stats, progress, tasks.len()) {
1078                    return Ok(report);
1079                }
1080            }
1081
1082            let deadline = drain_progress.map(|progress| progress.request.deadline);
1083            let mut deadline_wait = Box::pin(async move {
1084                if let Some(deadline) = deadline {
1085                    tokio::time::sleep_until(deadline).await;
1086                } else {
1087                    std::future::pending::<()>().await;
1088                }
1089            });
1090
1091            tokio::select! {
1092                changed = mode_rx.changed() => {
1093                    if changed.is_err() {
1094                        break;
1095                    }
1096                    if let PumpMode::Draining(request) = *mode_rx.borrow() {
1097                        drain_progress.get_or_insert(PumpDrainProgress {
1098                            request,
1099                            start_seen: stats.last_seen,
1100                        });
1101                    }
1102                }
1103                _ = &mut deadline_wait => {
1104                    if let Some(progress) = drain_progress {
1105                        return Ok(pump_drain_report(
1106                            stats,
1107                            progress.start_seen,
1108                            progress.request.up_to,
1109                            tasks.len(),
1110                            PumpDrainStopReason::Deadline,
1111                        ));
1112                    }
1113                }
1114                joined = tasks.join_next(), if !tasks.is_empty() => {
1115                    match joined {
1116                        Some(Ok(())) => {
1117                            record_pump_metrics(
1118                                &metrics_registry,
1119                                &event_log,
1120                                &topic,
1121                                stats.last_seen,
1122                                tasks.len(),
1123                            )
1124                            .await?;
1125                        }
1126                        Some(Err(error)) => {
1127                            return Err(format!("inbox dispatch task join failed: {error}").into());
1128                        }
1129                        None => {}
1130                    }
1131                }
1132                _ = tokio::time::sleep(Duration::from_millis(25)), if tasks.len() >= pump_config.max_outstanding => {
1133                    record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1134                }
1135                received = stream.next(), if tasks.len() < pump_config.max_outstanding => {
1136                    let Some(received) = received else {
1137                        break;
1138                    };
1139                    let (event_id, logged) = received
1140                        .map_err(|error| format!("topic pump read failed for {topic}: {error}"))?;
1141                    if logged.kind != "event_ingested" {
1142                        stats.last_seen = event_id;
1143                        event_log
1144                            .ack(&topic, &consumer, event_id)
1145                            .await
1146                            .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1147                        record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1148                        continue;
1149                    }
1150                    append_pump_lifecycle_event(
1151                        &event_log,
1152                        "pump_received",
1153                        json!({
1154                            "topic": topic.as_str(),
1155                            "event_log_id": event_id,
1156                            "outstanding": tasks.len(),
1157                            "max_outstanding": pump_config.max_outstanding,
1158                        }),
1159                    )
1160                    .await?;
1161                    let envelope: harn_vm::triggers::dispatcher::InboxEnvelope =
1162                        serde_json::from_value(logged.payload)
1163                            .map_err(|error| format!("failed to decode dispatcher inbox event: {error}"))?;
1164                    let trigger_id = envelope.trigger_id.clone();
1165                    let binding_version = envelope.binding_version;
1166                    let trigger_event_id = envelope.event.id.0.clone();
1167                    let parent_headers = logged.headers.clone();
1168                    append_pump_lifecycle_event(
1169                        &event_log,
1170                        "pump_eligible",
1171                        json!({
1172                            "topic": topic.as_str(),
1173                            "event_log_id": event_id,
1174                            "trigger_id": trigger_id.clone(),
1175                            "binding_version": binding_version,
1176                            "trigger_event_id": trigger_event_id,
1177                        }),
1178                    )
1179                    .await?;
1180                    metrics_registry.record_orchestrator_pump_admission_delay(
1181                        topic.as_str(),
1182                        admission_delay(logged.occurred_at_ms),
1183                    );
1184                    append_pump_lifecycle_event(
1185                        &event_log,
1186                        "pump_admitted",
1187                        json!({
1188                            "topic": topic.as_str(),
1189                            "event_log_id": event_id,
1190                            "outstanding_after_admit": tasks.len() + 1,
1191                            "max_outstanding": pump_config.max_outstanding,
1192                            "trigger_id": trigger_id.clone(),
1193                            "binding_version": binding_version,
1194                            "trigger_event_id": trigger_event_id,
1195                        }),
1196                    )
1197                    .await?;
1198                    let dispatcher = dispatcher.clone();
1199                    let task_event_log = event_log.clone();
1200                    let task_topic = topic.as_str().to_string();
1201                    let inbox_task_release_file = inbox_task_release_file.clone();
1202                    tasks.spawn_local(async move {
1203                        if let Some(path) = inbox_task_release_file.as_ref() {
1204                            wait_for_test_release_file(path).await;
1205                        }
1206                        let _ = append_pump_lifecycle_event(
1207                            &task_event_log,
1208                            "pump_dispatch_started",
1209                            json!({
1210                                "topic": task_topic.clone(),
1211                                "event_log_id": event_id,
1212                                "trigger_id": trigger_id,
1213                                "binding_version": binding_version,
1214                                "trigger_event_id": trigger_event_id,
1215                            }),
1216                        )
1217                        .await;
1218                        let result = dispatcher
1219                            .dispatch_inbox_envelope_with_parent_headers(
1220                                envelope,
1221                                &parent_headers,
1222                            )
1223                            .await;
1224                        let (status, error_message) = match result {
1225                            Ok(_) => ("completed", None),
1226                            Err(error) => {
1227                                let message = error.to_string();
1228                                eprintln!("[harn] inbox dispatch warning: {message}");
1229                                ("failed", Some(message))
1230                            }
1231                        };
1232                        let _ = append_pump_lifecycle_event(
1233                            &task_event_log,
1234                            "pump_dispatch_completed",
1235                            json!({
1236                                "topic": task_topic,
1237                                "event_log_id": event_id,
1238                                "status": status,
1239                                "error": error_message,
1240                            }),
1241                        )
1242                        .await;
1243                    });
1244                    stats.last_seen = event_id;
1245                    stats.processed += 1;
1246                    event_log
1247                        .ack(&topic, &consumer, event_id)
1248                        .await
1249                        .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1250                    append_pump_lifecycle_event(
1251                        &event_log,
1252                        "pump_acked",
1253                        json!({
1254                            "topic": topic.as_str(),
1255                            "event_log_id": event_id,
1256                            "cursor": event_id,
1257                        }),
1258                    )
1259                    .await?;
1260                    record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1261                }
1262            }
1263        }
1264
1265        while let Some(joined) = tasks.join_next().await {
1266            joined.map_err(|error| format!("inbox dispatch task join failed: {error}"))?;
1267            record_pump_metrics(
1268                &metrics_registry,
1269                &event_log,
1270                &topic,
1271                stats.last_seen,
1272                tasks.len(),
1273            )
1274            .await?;
1275        }
1276
1277        Ok(drain_progress
1278            .map(|progress| {
1279                pump_drain_report(
1280                    stats,
1281                    progress.start_seen,
1282                    progress.request.up_to,
1283                    0,
1284                    PumpDrainStopReason::Drained,
1285                )
1286            })
1287            .unwrap_or_else(|| PumpDrainReport {
1288                stats,
1289                drain_items: 0,
1290                remaining_queued: 0,
1291                outstanding_tasks: 0,
1292                stop_reason: PumpDrainStopReason::Drained,
1293            }))
1294    });
1295    Ok(PumpHandle { mode_tx, join })
1296}
1297
1298fn spawn_waitpoint_resume_pump(
1299    event_log: Arc<harn_vm::event_log::AnyEventLog>,
1300    dispatcher: harn_vm::Dispatcher,
1301    pump_config: PumpConfig,
1302    metrics_registry: Arc<harn_vm::MetricsRegistry>,
1303    pump_drain_gate: PumpDrainGate,
1304) -> Result<PumpHandle, OrchestratorError> {
1305    let topic = harn_vm::event_log::Topic::new(harn_vm::WAITPOINT_RESUME_TOPIC)
1306        .map_err(|error| error.to_string())?;
1307    spawn_topic_pump(
1308        event_log,
1309        topic,
1310        pump_config,
1311        metrics_registry,
1312        pump_drain_gate,
1313        move |logged| {
1314            let dispatcher = dispatcher.clone();
1315            async move {
1316                harn_vm::process_waitpoint_resume_event(&dispatcher, logged)
1317                    .await
1318                    .map_err(OrchestratorError::from)
1319            }
1320        },
1321    )
1322}
1323
1324fn spawn_waitpoint_cancel_pump(
1325    event_log: Arc<harn_vm::event_log::AnyEventLog>,
1326    dispatcher: harn_vm::Dispatcher,
1327    pump_config: PumpConfig,
1328    metrics_registry: Arc<harn_vm::MetricsRegistry>,
1329    pump_drain_gate: PumpDrainGate,
1330) -> Result<PumpHandle, OrchestratorError> {
1331    let topic = harn_vm::event_log::Topic::new(harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC)
1332        .map_err(|error| error.to_string())?;
1333    spawn_topic_pump(
1334        event_log,
1335        topic,
1336        pump_config,
1337        metrics_registry,
1338        pump_drain_gate,
1339        move |logged| {
1340            let dispatcher = dispatcher.clone();
1341            async move {
1342                if logged.kind != "dispatch_cancel_requested" {
1343                    return Ok(false);
1344                }
1345                harn_vm::service_waitpoints_once(&dispatcher, None)
1346                    .await
1347                    .map_err(|error| {
1348                        OrchestratorError::Serve(format!(
1349                            "failed to service waitpoints on cancel: {error}"
1350                        ))
1351                    })?;
1352                Ok(true)
1353            }
1354        },
1355    )
1356}
1357
1358fn spawn_waitpoint_sweeper(dispatcher: harn_vm::Dispatcher) -> WaitpointSweepHandle {
1359    let (stop_tx, mut stop_rx) = watch::channel(false);
1360    let join = tokio::task::spawn_local(async move {
1361        let mut interval = tokio::time::interval(WAITPOINT_SERVICE_INTERVAL);
1362        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1363        loop {
1364            tokio::select! {
1365                changed = stop_rx.changed() => {
1366                    if changed.is_err() || *stop_rx.borrow() {
1367                        break;
1368                    }
1369                }
1370                _ = interval.tick() => {
1371                    harn_vm::service_waitpoints_once(&dispatcher, None)
1372                        .await
1373                        .map_err(|error| format!("failed to service waitpoints on sweep: {error}"))?;
1374                }
1375            }
1376        }
1377        Ok(())
1378    });
1379    WaitpointSweepHandle { stop_tx, join }
1380}
1381
1382fn spawn_topic_pump<F, Fut>(
1383    event_log: Arc<harn_vm::event_log::AnyEventLog>,
1384    topic: harn_vm::event_log::Topic,
1385    _pump_config: PumpConfig,
1386    metrics_registry: Arc<harn_vm::MetricsRegistry>,
1387    pump_drain_gate: PumpDrainGate,
1388    process: F,
1389) -> Result<PumpHandle, OrchestratorError>
1390where
1391    F: Fn(harn_vm::event_log::LogEvent) -> Fut + 'static,
1392    Fut: std::future::Future<Output = Result<bool, OrchestratorError>> + 'static,
1393{
1394    let consumer = pump_consumer_id(&topic)?;
1395    let mut pump_drain_gate_rx = pump_drain_gate.subscribe();
1396    let (mode_tx, mut mode_rx) = watch::channel(PumpMode::Running);
1397    let join = tokio::task::spawn_local(async move {
1398        let start_from = event_log
1399            .consumer_cursor(&topic, &consumer)
1400            .await
1401            .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
1402            .or(event_log
1403                .latest(&topic)
1404                .await
1405                .map_err(|error| format!("failed to read topic head {topic}: {error}"))?);
1406        let mut stream = event_log
1407            .clone()
1408            .subscribe(&topic, start_from)
1409            .await
1410            .map_err(|error| format!("failed to subscribe topic {topic}: {error}"))?;
1411        let mut stats = PumpStats {
1412            last_seen: start_from.unwrap_or(0),
1413            processed: 0,
1414        };
1415        record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1416        let mut drain_progress = None;
1417        loop {
1418            if let Some(progress) = drain_progress {
1419                if let Some(report) = maybe_finish_pump_drain(stats, progress, 0) {
1420                    return Ok(report);
1421                }
1422            }
1423            let deadline = drain_progress.map(|progress| progress.request.deadline);
1424            let mut deadline_wait = Box::pin(async move {
1425                if let Some(deadline) = deadline {
1426                    tokio::time::sleep_until(deadline).await;
1427                } else {
1428                    std::future::pending::<()>().await;
1429                }
1430            });
1431            tokio::select! {
1432                changed = mode_rx.changed() => {
1433                    if changed.is_err() {
1434                        break;
1435                    }
1436                    if let PumpMode::Draining(request) = *mode_rx.borrow() {
1437                        drain_progress.get_or_insert(PumpDrainProgress {
1438                            request,
1439                            start_seen: stats.last_seen,
1440                        });
1441                    }
1442                }
1443                _ = &mut deadline_wait => {
1444                    if let Some(progress) = drain_progress {
1445                        return Ok(pump_drain_report(
1446                            stats,
1447                            progress.start_seen,
1448                            progress.request.up_to,
1449                            0,
1450                            PumpDrainStopReason::Deadline,
1451                        ));
1452                    }
1453                }
1454                received = stream.next() => {
1455                    let Some(received) = received else {
1456                        break;
1457                    };
1458                    let (event_id, logged) = received
1459                        .map_err(|error| format!("topic pump read failed for {topic}: {error}"))?;
1460                    record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 1).await?;
1461                    metrics_registry.record_orchestrator_pump_admission_delay(
1462                        topic.as_str(),
1463                        admission_delay(logged.occurred_at_ms),
1464                    );
1465                    wait_for_pump_drain_release(
1466                        &event_log,
1467                        &topic,
1468                        event_id,
1469                        &mut pump_drain_gate_rx,
1470                    )
1471                    .await?;
1472                    let handled = process(logged).await?;
1473                    stats.last_seen = event_id;
1474                    if handled {
1475                        stats.processed += 1;
1476                    }
1477                    event_log
1478                        .ack(&topic, &consumer, event_id)
1479                        .await
1480                        .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1481                    record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1482                }
1483            }
1484        }
1485        Ok(drain_progress
1486            .map(|progress| {
1487                pump_drain_report(
1488                    stats,
1489                    progress.start_seen,
1490                    progress.request.up_to,
1491                    0,
1492                    PumpDrainStopReason::Drained,
1493                )
1494            })
1495            .unwrap_or_else(|| PumpDrainReport {
1496                stats,
1497                drain_items: 0,
1498                remaining_queued: 0,
1499                outstanding_tasks: 0,
1500                stop_reason: PumpDrainStopReason::Drained,
1501            }))
1502    });
1503    Ok(PumpHandle { mode_tx, join })
1504}
1505
1506// ── Graceful shutdown ─────────────────────────────────────────────────────────
1507
1508#[allow(clippy::too_many_arguments)]
1509async fn graceful_shutdown(
1510    ctx: GracefulShutdownCtx<'_>,
1511    listener: ListenerRuntime,
1512    dispatcher: harn_vm::Dispatcher,
1513    pending_pumps: Vec<(String, PumpHandle)>,
1514    cron_pump: PumpHandle,
1515    inbox_pumps: Vec<(String, PumpHandle)>,
1516    waitpoint_pump: PumpHandle,
1517    waitpoint_cancel_pump: PumpHandle,
1518    waitpoint_sweeper: WaitpointSweepHandle,
1519) -> Result<(), OrchestratorError> {
1520    eprintln!("[harn] signal received, starting graceful shutdown...");
1521    tracing::info!(
1522        component = "orchestrator",
1523        trace_id = "",
1524        shutdown_timeout_secs = ctx.shutdown_timeout.as_secs(),
1525        "signal received, starting graceful shutdown"
1526    );
1527    let listener_in_flight = listener
1528        .trigger_metrics()
1529        .into_values()
1530        .map(|metrics| metrics.in_flight)
1531        .sum::<u64>();
1532    let dispatcher_before = dispatcher.snapshot();
1533    append_lifecycle_event(
1534        ctx.event_log,
1535        "draining",
1536        json!({
1537            "bind": ctx.bind.to_string(),
1538            "role": ctx.role.as_str(),
1539            "status": "draining",
1540            "http_in_flight": listener_in_flight,
1541            "dispatcher_in_flight": dispatcher_before.in_flight,
1542            "dispatcher_retry_queue_depth": dispatcher_before.retry_queue_depth,
1543            "dispatcher_dlq_depth": dispatcher_before.dlq_depth,
1544            "shutdown_timeout_secs": ctx.shutdown_timeout.as_secs(),
1545            "drain_max_items": ctx.drain_config.max_items,
1546            "drain_deadline_secs": ctx.drain_config.deadline.as_secs(),
1547        }),
1548    )
1549    .await?;
1550
1551    let deadline = tokio::time::Instant::now() + ctx.shutdown_timeout;
1552    let listener_metrics = listener.shutdown(remaining_budget(deadline)).await?;
1553    for handle in &ctx.connectors.handles {
1554        let connector = handle.lock().await;
1555        if let Err(error) = connector.shutdown(remaining_budget(deadline)).await {
1556            eprintln!(
1557                "[harn] connector {} shutdown warning: {error}",
1558                connector.provider_id().as_str()
1559            );
1560        }
1561    }
1562
1563    let mut pending_processed = 0;
1564    for (topic_name, pump) in pending_pumps {
1565        let stats =
1566            drain_pump_best_effort(ctx.event_log, &topic_name, pump, ctx.drain_config, deadline)
1567                .await?;
1568        pending_processed += stats.stats.processed;
1569        emit_drain_truncated(ctx.event_log, &topic_name, stats, ctx.drain_config).await?;
1570    }
1571    let cron_stats = drain_pump_best_effort(
1572        ctx.event_log,
1573        CRON_TICK_TOPIC,
1574        cron_pump,
1575        ctx.drain_config,
1576        deadline,
1577    )
1578    .await?;
1579    emit_drain_truncated(ctx.event_log, CRON_TICK_TOPIC, cron_stats, ctx.drain_config).await?;
1580    let mut inbox_processed = 0;
1581    for (topic_name, pump) in inbox_pumps {
1582        let stats =
1583            drain_pump_best_effort(ctx.event_log, &topic_name, pump, ctx.drain_config, deadline)
1584                .await?;
1585        inbox_processed += stats.stats.processed;
1586        emit_drain_truncated(ctx.event_log, &topic_name, stats, ctx.drain_config).await?;
1587    }
1588    let waitpoint_stats = waitpoint_pump
1589        .drain(
1590            ctx.event_log,
1591            harn_vm::WAITPOINT_RESUME_TOPIC,
1592            topic_latest_id(ctx.event_log, harn_vm::WAITPOINT_RESUME_TOPIC).await?,
1593            ctx.drain_config,
1594            deadline,
1595        )
1596        .await?;
1597    emit_drain_truncated(
1598        ctx.event_log,
1599        harn_vm::WAITPOINT_RESUME_TOPIC,
1600        waitpoint_stats,
1601        ctx.drain_config,
1602    )
1603    .await?;
1604    let waitpoint_cancel_stats = waitpoint_cancel_pump
1605        .drain(
1606            ctx.event_log,
1607            harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC,
1608            topic_latest_id(ctx.event_log, harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC).await?,
1609            ctx.drain_config,
1610            deadline,
1611        )
1612        .await?;
1613    emit_drain_truncated(
1614        ctx.event_log,
1615        harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC,
1616        waitpoint_cancel_stats,
1617        ctx.drain_config,
1618    )
1619    .await?;
1620    waitpoint_sweeper.shutdown().await?;
1621    let drain_report = dispatcher
1622        .drain(remaining_budget(deadline))
1623        .await
1624        .map_err(|error| format!("failed to drain dispatcher: {error}"))?;
1625
1626    let stopped_at = now_rfc3339()?;
1627    let timed_out = !drain_report.drained;
1628    if timed_out {
1629        dispatcher.shutdown();
1630    }
1631    append_lifecycle_event(
1632        ctx.event_log,
1633        "stopped",
1634        json!({
1635            "bind": ctx.bind.to_string(),
1636            "role": ctx.role.as_str(),
1637            "status": "stopped",
1638            "http_in_flight": listener_in_flight,
1639            "dispatcher_in_flight": drain_report.in_flight,
1640            "dispatcher_retry_queue_depth": drain_report.retry_queue_depth,
1641            "dispatcher_dlq_depth": drain_report.dlq_depth,
1642            "pending_events_drained": pending_processed,
1643            "cron_events_drained": cron_stats.stats.processed,
1644            "inbox_events_drained": inbox_processed,
1645            "waitpoint_events_drained": waitpoint_stats.stats.processed,
1646            "waitpoint_cancel_events_drained": waitpoint_cancel_stats.stats.processed,
1647            "timed_out": timed_out,
1648        }),
1649    )
1650    .await?;
1651    ctx.event_log
1652        .flush()
1653        .await
1654        .map_err(|error| format!("failed to flush event log: {error}"))?;
1655
1656    write_state_snapshot(
1657        &ctx.state_dir.join(STATE_SNAPSHOT_FILE),
1658        &ServeStateSnapshot {
1659            status: "stopped".to_string(),
1660            role: ctx.role.as_str().to_string(),
1661            bind: ctx.bind.to_string(),
1662            listener_url: ctx.listener_url.clone(),
1663            manifest_path: ctx.config_path.display().to_string(),
1664            state_dir: ctx.state_dir.display().to_string(),
1665            started_at: ctx.startup_started_at.to_string(),
1666            stopped_at: Some(stopped_at),
1667            secret_provider_chain: ctx.secret_chain_display.to_string(),
1668            event_log_backend: ctx.event_log_description.backend.to_string(),
1669            event_log_location: ctx
1670                .event_log_description
1671                .location
1672                .as_ref()
1673                .map(|path| path.display().to_string()),
1674            triggers: trigger_state_snapshots(ctx.triggers, &listener_metrics),
1675            connectors: ctx.connectors.providers.clone(),
1676            activations: ctx
1677                .connectors
1678                .activations
1679                .iter()
1680                .map(|activation| ConnectorActivationSnapshot {
1681                    provider: activation.provider.as_str().to_string(),
1682                    binding_count: activation.binding_count,
1683                })
1684                .collect(),
1685        },
1686    )?;
1687
1688    if timed_out {
1689        eprintln!(
1690            "[harn] graceful shutdown timed out with {} dispatches and {} retry waits remaining",
1691            drain_report.in_flight, drain_report.retry_queue_depth
1692        );
1693    }
1694    eprintln!("[harn] graceful shutdown complete");
1695    tracing::info!(
1696        component = "orchestrator",
1697        trace_id = "",
1698        "graceful shutdown complete"
1699    );
1700    Ok(())
1701}
1702
1703struct GracefulShutdownCtx<'a> {
1704    role: OrchestratorRole,
1705    bind: SocketAddr,
1706    listener_url: String,
1707    config_path: &'a Path,
1708    state_dir: &'a Path,
1709    startup_started_at: &'a str,
1710    event_log: &'a Arc<AnyEventLog>,
1711    event_log_description: &'a harn_vm::event_log::EventLogDescription,
1712    secret_chain_display: &'a str,
1713    triggers: &'a [CollectedManifestTrigger],
1714    connectors: &'a ConnectorRuntime,
1715    shutdown_timeout: Duration,
1716    drain_config: DrainConfig,
1717}
1718
1719// ── Manifest reload ───────────────────────────────────────────────────────────
1720
1721struct RuntimeCtx<'a> {
1722    role: OrchestratorRole,
1723    config_path: &'a Path,
1724    state_dir: &'a Path,
1725    bind: SocketAddr,
1726    startup_started_at: &'a str,
1727    event_log: &'a Arc<AnyEventLog>,
1728    event_log_description: &'a harn_vm::event_log::EventLogDescription,
1729    secret_chain_display: &'a str,
1730    listener: &'a ListenerRuntime,
1731    connectors: &'a mut ConnectorRuntime,
1732    live_manifest: &'a mut Manifest,
1733    live_triggers: &'a mut Vec<CollectedManifestTrigger>,
1734    secret_provider: &'a Arc<dyn harn_vm::secrets::SecretProvider>,
1735    metrics_registry: &'a Arc<harn_vm::MetricsRegistry>,
1736    mcp_service: Option<&'a Arc<crate::commands::mcp::serve::McpOrchestratorService>>,
1737    #[cfg_attr(not(unix), allow(dead_code))]
1738    reload_rx: &'a mut mpsc::UnboundedReceiver<AdminReloadRequest>,
1739}
1740
1741#[cfg_attr(not(unix), allow(dead_code))]
1742async fn handle_reload_request(
1743    ctx: &mut RuntimeCtx<'_>,
1744    request: AdminReloadRequest,
1745) -> Result<(), OrchestratorError> {
1746    let source = request.source.clone();
1747    match reload_manifest(ctx).await {
1748        Ok(summary) => {
1749            if let Some(mcp_service) = ctx.mcp_service {
1750                mcp_service.notify_manifest_reloaded();
1751            }
1752            write_running_state_snapshot(ctx)?;
1753            append_manifest_event(
1754                ctx.event_log,
1755                "reload_succeeded",
1756                json!({
1757                    "source": source,
1758                    "summary": summary,
1759                }),
1760            )
1761            .await?;
1762            eprintln!(
1763                "[harn] manifest reload ({source}) applied: +{} ~{} -{}",
1764                summary.added.len(),
1765                summary.modified.len(),
1766                summary.removed.len()
1767            );
1768            if let Some(response_tx) = request.response_tx {
1769                let _ = response_tx.send(serde_json::to_value(&summary).map_err(|error| {
1770                    OrchestratorError::Serve(format!("failed to encode reload summary: {error}"))
1771                }));
1772            }
1773        }
1774        Err(error) => {
1775            eprintln!("[harn] manifest reload ({source}) failed: {error}");
1776            append_manifest_event(
1777                ctx.event_log,
1778                "reload_failed",
1779                json!({
1780                    "source": source,
1781                    "error": error.to_string(),
1782                }),
1783            )
1784            .await?;
1785            if let Some(response_tx) = request.response_tx {
1786                let _ = response_tx.send(Err(error));
1787            }
1788        }
1789    }
1790    Ok(())
1791}
1792
1793#[cfg_attr(not(unix), allow(dead_code))]
1794fn write_running_state_snapshot(ctx: &RuntimeCtx<'_>) -> Result<(), OrchestratorError> {
1795    let listener_metrics = ctx.listener.trigger_metrics();
1796    write_state_snapshot(
1797        &ctx.state_dir.join(STATE_SNAPSHOT_FILE),
1798        &ServeStateSnapshot {
1799            status: "running".to_string(),
1800            role: ctx.role.as_str().to_string(),
1801            bind: ctx.bind.to_string(),
1802            listener_url: ctx.listener.url(),
1803            manifest_path: ctx.config_path.display().to_string(),
1804            state_dir: ctx.state_dir.display().to_string(),
1805            started_at: ctx.startup_started_at.to_string(),
1806            stopped_at: None,
1807            secret_provider_chain: ctx.secret_chain_display.to_string(),
1808            event_log_backend: ctx.event_log_description.backend.to_string(),
1809            event_log_location: ctx
1810                .event_log_description
1811                .location
1812                .as_ref()
1813                .map(|path| path.display().to_string()),
1814            triggers: trigger_state_snapshots(ctx.live_triggers, &listener_metrics),
1815            connectors: ctx.connectors.providers.clone(),
1816            activations: ctx
1817                .connectors
1818                .activations
1819                .iter()
1820                .map(|activation| ConnectorActivationSnapshot {
1821                    provider: activation.provider.as_str().to_string(),
1822                    binding_count: activation.binding_count,
1823                })
1824                .collect(),
1825        },
1826    )
1827}
1828
1829#[cfg_attr(not(unix), allow(dead_code))]
1830async fn reload_manifest(
1831    ctx: &mut RuntimeCtx<'_>,
1832) -> Result<ManifestReloadSummary, OrchestratorError> {
1833    let (manifest, manifest_dir) = load_manifest(ctx.config_path)?;
1834    let mut vm = ctx
1835        .role
1836        .build_vm(&manifest_dir, &manifest_dir, ctx.state_dir)?;
1837    let extensions = package::load_runtime_extensions(ctx.config_path);
1838    let collected_triggers = package::collect_manifest_triggers(&mut vm, &extensions)
1839        .await
1840        .map_err(|error| format!("failed to collect manifest triggers: {error}"))?;
1841    let summary = summarize_manifest_reload(ctx.live_triggers, &collected_triggers);
1842    let connector_reload =
1843        connector_reload_fingerprint_map(ctx.live_triggers, &ctx.connectors.provider_overrides)
1844            != connector_reload_fingerprint_map(
1845                &collected_triggers,
1846                &extensions.provider_connectors,
1847            );
1848    let next_connector_runtime = if connector_reload {
1849        let mut runtime = initialize_connectors(
1850            &collected_triggers,
1851            ctx.event_log.clone(),
1852            ctx.secret_provider.clone(),
1853            ctx.metrics_registry.clone(),
1854            &extensions.provider_connectors,
1855        )
1856        .await?;
1857        runtime.activations = runtime
1858            .registry
1859            .activate_all(&runtime.trigger_registry)
1860            .await
1861            .map_err(|error| error.to_string())?;
1862        Some(runtime)
1863    } else {
1864        None
1865    };
1866    let previous_manifest = ctx.live_manifest.clone();
1867    let previous_triggers = ctx.live_triggers.clone();
1868    package::install_collected_manifest_triggers(&collected_triggers).await?;
1869    apply_supervisor_state(ctx.state_dir).await?;
1870    let binding_versions = live_manifest_binding_versions();
1871    let route_registry = next_connector_runtime
1872        .as_ref()
1873        .map(|runtime| &runtime.registry)
1874        .unwrap_or(&ctx.connectors.registry);
1875    let route_overrides = next_connector_runtime
1876        .as_ref()
1877        .map(|runtime| runtime.provider_overrides.as_slice())
1878        .unwrap_or(ctx.connectors.provider_overrides.as_slice());
1879    let route_configs = match build_route_configs(&collected_triggers, &binding_versions)
1880        .and_then(|routes| attach_route_connectors(routes, route_registry, route_overrides))
1881    {
1882        Ok(routes) => routes,
1883        Err(error) => {
1884            rollback_manifest_reload(ctx, &previous_manifest, &previous_triggers)
1885                .await
1886                .map_err(|rollback| format!("{error}; rollback failed: {rollback}"))?;
1887            return Err(error);
1888        }
1889    };
1890    if let Err(error) = ctx.listener.reload_routes(route_configs) {
1891        rollback_manifest_reload(ctx, &previous_manifest, &previous_triggers)
1892            .await
1893            .map_err(|rollback| format!("{error}; rollback failed: {rollback}"))?;
1894        return Err(error);
1895    }
1896    if let Some(runtime) = next_connector_runtime {
1897        let previous_handles = ctx.connectors.handles.clone();
1898        let connector_clients = runtime.registry.client_map().await;
1899        harn_vm::install_active_connector_clients(connector_clients);
1900        *ctx.connectors = runtime;
1901        for handle in previous_handles {
1902            let connector = handle.lock().await;
1903            if let Err(error) = connector.shutdown(Duration::from_secs(5)).await {
1904                eprintln!(
1905                    "[harn] connector {} reload shutdown warning: {error}",
1906                    connector.provider_id().as_str()
1907                );
1908            }
1909        }
1910    }
1911    *ctx.live_manifest = manifest;
1912    *ctx.live_triggers = collected_triggers;
1913    Ok(summary)
1914}
1915
1916#[cfg_attr(not(unix), allow(dead_code))]
1917async fn rollback_manifest_reload(
1918    ctx: &mut RuntimeCtx<'_>,
1919    previous_manifest: &Manifest,
1920    previous_triggers: &[CollectedManifestTrigger],
1921) -> Result<(), OrchestratorError> {
1922    package::install_collected_manifest_triggers(previous_triggers).await?;
1923    apply_supervisor_state(ctx.state_dir).await?;
1924    let binding_versions = live_manifest_binding_versions();
1925    let route_configs = build_route_configs(previous_triggers, &binding_versions)?;
1926    let route_configs = attach_route_connectors(
1927        route_configs,
1928        &ctx.connectors.registry,
1929        &ctx.connectors.provider_overrides,
1930    )?;
1931    ctx.listener.reload_routes(route_configs)?;
1932    *ctx.live_manifest = previous_manifest.clone();
1933    *ctx.live_triggers = previous_triggers.to_vec();
1934    Ok(())
1935}
1936
1937#[cfg_attr(not(unix), allow(dead_code))]
1938fn summarize_manifest_reload(
1939    current: &[CollectedManifestTrigger],
1940    next: &[CollectedManifestTrigger],
1941) -> ManifestReloadSummary {
1942    let current_map = trigger_fingerprint_map(current, true);
1943    let next_map = trigger_fingerprint_map(next, true);
1944    let mut summary = ManifestReloadSummary::default();
1945    let ids: BTreeSet<String> = current_map.keys().chain(next_map.keys()).cloned().collect();
1946    for id in ids {
1947        match (current_map.get(&id), next_map.get(&id)) {
1948            (None, Some(_)) => summary.added.push(id),
1949            (Some(_), None) => summary.removed.push(id),
1950            (Some(left), Some(right)) if left == right => summary.unchanged.push(id),
1951            (Some(_), Some(_)) => summary.modified.push(id),
1952            (None, None) => {}
1953        }
1954    }
1955    summary
1956}
1957
1958#[cfg_attr(not(unix), allow(dead_code))]
1959fn trigger_fingerprint_map(
1960    triggers: &[CollectedManifestTrigger],
1961    include_http_managed: bool,
1962) -> BTreeMap<String, String> {
1963    triggers
1964        .iter()
1965        .filter(|trigger| include_http_managed || !is_http_managed_trigger(trigger))
1966        .map(|trigger| {
1967            let spec = package::manifest_trigger_binding_spec(trigger.clone());
1968            (trigger.config.id.clone(), spec.definition_fingerprint)
1969        })
1970        .collect()
1971}
1972
1973#[cfg_attr(not(unix), allow(dead_code))]
1974fn connector_reload_fingerprint_map(
1975    triggers: &[CollectedManifestTrigger],
1976    provider_overrides: &[ResolvedProviderConnectorConfig],
1977) -> BTreeMap<String, Vec<String>> {
1978    let mut by_provider = BTreeMap::<String, Vec<String>>::new();
1979    for trigger in triggers {
1980        let provider = trigger.config.provider.as_str().to_string();
1981        if !connector_owns_ingress(&provider, provider_overrides)
1982            && matches!(
1983                trigger.config.kind,
1984                crate::package::TriggerKind::Webhook | crate::package::TriggerKind::A2aPush
1985            )
1986        {
1987            continue;
1988        }
1989        let spec = package::manifest_trigger_binding_spec(trigger.clone());
1990        by_provider
1991            .entry(provider)
1992            .or_default()
1993            .push(spec.definition_fingerprint);
1994    }
1995    for override_config in provider_overrides {
1996        by_provider
1997            .entry(override_config.id.as_str().to_string())
1998            .or_default()
1999            .push(provider_connector_fingerprint(override_config));
2000    }
2001    for fingerprints in by_provider.values_mut() {
2002        fingerprints.sort();
2003    }
2004    by_provider
2005}
2006
2007#[cfg_attr(not(unix), allow(dead_code))]
2008fn provider_connector_fingerprint(config: &ResolvedProviderConnectorConfig) -> String {
2009    match &config.connector {
2010        ResolvedProviderConnectorKind::RustBuiltin => format!(
2011            "{}::builtin@{}",
2012            config.id.as_str(),
2013            config.manifest_dir.display()
2014        ),
2015        ResolvedProviderConnectorKind::Harn { module } => format!(
2016            "{}::harn:{}@{}",
2017            config.id.as_str(),
2018            module,
2019            config.manifest_dir.display()
2020        ),
2021        ResolvedProviderConnectorKind::Invalid(message) => format!(
2022            "{}::invalid:{}@{}",
2023            config.id.as_str(),
2024            message,
2025            config.manifest_dir.display()
2026        ),
2027    }
2028}
2029
2030#[cfg_attr(not(unix), allow(dead_code))]
2031fn is_http_managed_trigger(trigger: &CollectedManifestTrigger) -> bool {
2032    matches!(
2033        trigger.config.kind,
2034        crate::package::TriggerKind::Webhook | crate::package::TriggerKind::A2aPush
2035    )
2036}
2037
2038// ── Connector helpers ─────────────────────────────────────────────────────────
2039
2040async fn initialize_connectors(
2041    triggers: &[CollectedManifestTrigger],
2042    event_log: Arc<harn_vm::event_log::AnyEventLog>,
2043    secrets: Arc<dyn harn_vm::secrets::SecretProvider>,
2044    metrics: Arc<harn_vm::MetricsRegistry>,
2045    provider_overrides: &[ResolvedProviderConnectorConfig],
2046) -> Result<ConnectorRuntime, OrchestratorError> {
2047    let mut registry = harn_vm::ConnectorRegistry::default();
2048    let mut trigger_registry = harn_vm::TriggerRegistry::default();
2049    let mut grouped_kinds: BTreeMap<harn_vm::ProviderId, BTreeSet<String>> = BTreeMap::new();
2050
2051    for trigger in triggers {
2052        let binding = trigger_binding_for(&trigger.config)?;
2053        grouped_kinds
2054            .entry(binding.provider.clone())
2055            .or_default()
2056            .insert(binding.kind.as_str().to_string());
2057        trigger_registry.register(binding);
2058    }
2059
2060    let ctx = harn_vm::ConnectorCtx {
2061        inbox: Arc::new(
2062            harn_vm::InboxIndex::new(event_log.clone(), metrics.clone())
2063                .await
2064                .map_err(|error| error.to_string())?,
2065        ),
2066        event_log,
2067        secrets,
2068        metrics,
2069        rate_limiter: Arc::new(harn_vm::RateLimiterFactory::default()),
2070    };
2071
2072    let mut providers = Vec::new();
2073    let mut handles = Vec::new();
2074    for (provider, kinds) in grouped_kinds {
2075        let provider_name = provider.as_str().to_string();
2076        if let Some(connector) = connector_override_for(&provider, provider_overrides).await? {
2077            registry.remove(&provider);
2078            registry
2079                .register(connector)
2080                .map_err(|error| error.to_string())?;
2081        }
2082        if registry.get(&provider).is_none() {
2083            if provider_requires_harn_connector(provider.as_str()) {
2084                return Err(format!(
2085                    "provider '{}' is package-backed; add [[providers]] id = \"{}\" with \
2086                     connector = {{ harn = \"...\" }} to the manifest",
2087                    provider.as_str(),
2088                    provider.as_str()
2089                )
2090                .into());
2091            }
2092            let connector = connector_for(&provider, kinds);
2093            registry
2094                .register(connector)
2095                .map_err(|error| error.to_string())?;
2096        }
2097        let handle = registry
2098            .get(&provider)
2099            .ok_or_else(|| format!("connector registry lost provider '{}'", provider.as_str()))?;
2100        handle
2101            .lock()
2102            .await
2103            .init(ctx.clone())
2104            .await
2105            .map_err(|error| error.to_string())?;
2106        handles.push(handle.clone());
2107        providers.push(provider_name);
2108    }
2109
2110    Ok(ConnectorRuntime {
2111        registry,
2112        trigger_registry,
2113        handles,
2114        providers,
2115        activations: Vec::new(),
2116        provider_overrides: provider_overrides.to_vec(),
2117    })
2118}
2119
2120fn trigger_binding_for(
2121    config: &ResolvedTriggerConfig,
2122) -> Result<harn_vm::TriggerBinding, OrchestratorError> {
2123    Ok(harn_vm::TriggerBinding {
2124        provider: config.provider.clone(),
2125        kind: harn_vm::TriggerKind::from(trigger_kind_name(config.kind)),
2126        binding_id: config.id.clone(),
2127        dedupe_key: config.dedupe_key.clone(),
2128        dedupe_retention_days: config.retry.retention_days,
2129        config: connector_binding_config(config)?,
2130    })
2131}
2132
2133fn connector_binding_config(
2134    config: &ResolvedTriggerConfig,
2135) -> Result<JsonValue, OrchestratorError> {
2136    match config.kind {
2137        crate::package::TriggerKind::Cron => {
2138            serde_json::to_value(&config.kind_specific).map_err(|error| {
2139                OrchestratorError::Serve({
2140                    format!(
2141                        "failed to encode cron trigger config '{}': {error}",
2142                        config.id
2143                    )
2144                })
2145            })
2146        }
2147        crate::package::TriggerKind::Webhook => Ok(serde_json::json!({
2148            "match": config.match_,
2149            "secrets": config.secrets,
2150            "webhook": config.kind_specific,
2151        })),
2152        crate::package::TriggerKind::Poll => Ok(serde_json::json!({
2153            "match": config.match_,
2154            "secrets": config.secrets,
2155            "poll": config.kind_specific,
2156        })),
2157        crate::package::TriggerKind::Stream => Ok(serde_json::json!({
2158            "match": config.match_,
2159            "secrets": config.secrets,
2160            "stream": config.kind_specific,
2161            "window": config.window,
2162        })),
2163        crate::package::TriggerKind::A2aPush => Ok(serde_json::json!({
2164            "match": config.match_,
2165            "secrets": config.secrets,
2166            "a2a_push": a2a_push_connector_config(&config.kind_specific)?,
2167        })),
2168        _ => Ok(JsonValue::Null),
2169    }
2170}
2171
2172fn a2a_push_connector_config(
2173    kind_specific: &BTreeMap<String, toml::Value>,
2174) -> Result<JsonValue, OrchestratorError> {
2175    if let Some(nested) = kind_specific.get("a2a_push") {
2176        return serde_json::to_value(nested).map_err(|error| {
2177            OrchestratorError::Serve(format!("failed to encode a2a_push trigger config: {error}"))
2178        });
2179    }
2180    let filtered = kind_specific
2181        .iter()
2182        .filter(|(key, _)| key.as_str() != "path")
2183        .map(|(key, value)| (key.clone(), value.clone()))
2184        .collect::<BTreeMap<_, _>>();
2185    serde_json::to_value(filtered).map_err(|error| {
2186        OrchestratorError::Serve(format!("failed to encode a2a_push trigger config: {error}"))
2187    })
2188}
2189
2190fn connector_for(
2191    provider: &harn_vm::ProviderId,
2192    kinds: BTreeSet<String>,
2193) -> Box<dyn harn_vm::Connector> {
2194    match provider.as_str() {
2195        "cron" => Box::new(harn_vm::CronConnector::new()),
2196        _ => Box::new(PlaceholderConnector::new(provider.clone(), kinds)),
2197    }
2198}
2199
2200async fn connector_override_for(
2201    provider: &harn_vm::ProviderId,
2202    provider_overrides: &[ResolvedProviderConnectorConfig],
2203) -> Result<Option<Box<dyn harn_vm::Connector>>, OrchestratorError> {
2204    let Some(override_config) = provider_overrides
2205        .iter()
2206        .find(|entry| entry.id == *provider)
2207    else {
2208        return Ok(None);
2209    };
2210    match &override_config.connector {
2211        ResolvedProviderConnectorKind::RustBuiltin => Ok(None),
2212        ResolvedProviderConnectorKind::Invalid(message) => {
2213            Err(OrchestratorError::Serve(message.clone()))
2214        }
2215        ResolvedProviderConnectorKind::Harn { module } => {
2216            let module_path =
2217                harn_vm::resolve_module_import_path(&override_config.manifest_dir, module);
2218            let connector = harn_vm::HarnConnector::load(&module_path)
2219                .await
2220                .map_err(|error| {
2221                    format!(
2222                        "failed to load Harn connector '{}' for provider '{}': {error}",
2223                        module_path.display(),
2224                        provider.as_str()
2225                    )
2226                })?;
2227            Ok(Some(Box::new(connector)))
2228        }
2229    }
2230}
2231
2232fn build_route_configs(
2233    triggers: &[CollectedManifestTrigger],
2234    binding_versions: &BTreeMap<String, u32>,
2235) -> Result<Vec<RouteConfig>, OrchestratorError> {
2236    let mut seen_paths = BTreeSet::new();
2237    let mut routes = Vec::new();
2238    for trigger in triggers {
2239        let Some(binding_version) = binding_versions.get(&trigger.config.id).copied() else {
2240            return Err(format!(
2241                "trigger registry is missing active manifest binding '{}'",
2242                trigger.config.id
2243            )
2244            .into());
2245        };
2246        if let Some(route) = RouteConfig::from_trigger(trigger, binding_version)? {
2247            if !seen_paths.insert(route.path.clone()) {
2248                return Err(format!(
2249                    "trigger route '{}' is configured more than once",
2250                    route.path
2251                )
2252                .into());
2253            }
2254            routes.push(route);
2255        }
2256    }
2257    Ok(routes)
2258}
2259
2260fn attach_route_connectors(
2261    routes: Vec<RouteConfig>,
2262    registry: &harn_vm::ConnectorRegistry,
2263    provider_overrides: &[ResolvedProviderConnectorConfig],
2264) -> Result<Vec<RouteConfig>, OrchestratorError> {
2265    routes
2266        .into_iter()
2267        .map(|mut route| {
2268            if route.connector_ingress
2269                || connector_owns_ingress(route.provider.as_str(), provider_overrides)
2270            {
2271                route.connector = Some(registry.get(&route.provider).ok_or_else(|| {
2272                    format!(
2273                        "connector registry is missing provider '{}'",
2274                        route.provider.as_str()
2275                    )
2276                })?);
2277            }
2278            Ok(route)
2279        })
2280        .collect()
2281}
2282
2283fn connector_owns_ingress(
2284    provider: &str,
2285    provider_overrides: &[ResolvedProviderConnectorConfig],
2286) -> bool {
2287    provider_overrides.iter().any(|entry| {
2288        entry.id.as_str() == provider
2289            && matches!(entry.connector, ResolvedProviderConnectorKind::Harn { .. })
2290    })
2291}
2292
2293fn provider_requires_harn_connector(provider: &str) -> bool {
2294    harn_vm::provider_metadata(provider).is_some_and(|metadata| {
2295        matches!(
2296            metadata.runtime,
2297            harn_vm::ProviderRuntimeMetadata::Placeholder
2298        )
2299    })
2300}
2301
2302fn live_manifest_binding_versions() -> BTreeMap<String, u32> {
2303    let mut versions = BTreeMap::new();
2304    for binding in harn_vm::snapshot_trigger_bindings() {
2305        if binding.source != harn_vm::TriggerBindingSource::Manifest {
2306            continue;
2307        }
2308        if binding.state == harn_vm::TriggerState::Terminated {
2309            continue;
2310        }
2311        versions
2312            .entry(binding.id)
2313            .and_modify(|current: &mut u32| *current = (*current).max(binding.version))
2314            .or_insert(binding.version);
2315    }
2316    versions
2317}
2318
2319fn trigger_state_snapshots(
2320    triggers: &[CollectedManifestTrigger],
2321    listener_metrics: &BTreeMap<String, TriggerMetricSnapshot>,
2322) -> Vec<TriggerStateSnapshot> {
2323    let bindings_by_id = harn_vm::snapshot_trigger_bindings()
2324        .into_iter()
2325        .filter(|binding| binding.source == harn_vm::TriggerBindingSource::Manifest)
2326        .fold(
2327            BTreeMap::<String, harn_vm::TriggerBindingSnapshot>::new(),
2328            |mut acc, binding| {
2329                match acc.get(&binding.id) {
2330                    Some(current) if current.version >= binding.version => {}
2331                    _ => {
2332                        acc.insert(binding.id.clone(), binding);
2333                    }
2334                }
2335                acc
2336            },
2337        );
2338
2339    triggers
2340        .iter()
2341        .map(|trigger| {
2342            let runtime = bindings_by_id.get(&trigger.config.id);
2343            let metrics = listener_metrics.get(&trigger.config.id);
2344            TriggerStateSnapshot {
2345                id: trigger.config.id.clone(),
2346                provider: trigger.config.provider.as_str().to_string(),
2347                kind: trigger_kind_name(trigger.config.kind).to_string(),
2348                handler: handler_kind(&trigger.handler).to_string(),
2349                version: runtime.map(|binding| binding.version),
2350                state: runtime.map(|binding| binding.state.as_str().to_string()),
2351                received: metrics.map(|value| value.received).unwrap_or(0),
2352                dispatched: metrics.map(|value| value.dispatched).unwrap_or(0),
2353                failed: metrics.map(|value| value.failed).unwrap_or(0),
2354                in_flight: metrics.map(|value| value.in_flight).unwrap_or(0),
2355            }
2356        })
2357        .collect()
2358}
2359
2360// ── Misc helpers ──────────────────────────────────────────────────────────────
2361
2362fn format_trigger_summary(triggers: &[CollectedManifestTrigger]) -> String {
2363    if triggers.is_empty() {
2364        return "none".to_string();
2365    }
2366    triggers
2367        .iter()
2368        .map(|trigger| {
2369            format!(
2370                "{} [{}:{} -> {}]",
2371                trigger.config.id,
2372                trigger.config.provider.as_str(),
2373                trigger_kind_name(trigger.config.kind),
2374                handler_kind(&trigger.handler)
2375            )
2376        })
2377        .collect::<Vec<_>>()
2378        .join(", ")
2379}
2380
2381fn format_activation_summary(activations: &[harn_vm::ActivationHandle]) -> String {
2382    if activations.is_empty() {
2383        return "none".to_string();
2384    }
2385    activations
2386        .iter()
2387        .map(|activation| {
2388            format!(
2389                "{}({})",
2390                activation.provider.as_str(),
2391                activation.binding_count
2392            )
2393        })
2394        .collect::<Vec<_>>()
2395        .join(", ")
2396}
2397
2398fn handler_kind(handler: &CollectedTriggerHandler) -> &'static str {
2399    match handler {
2400        CollectedTriggerHandler::Local { .. } => "local",
2401        CollectedTriggerHandler::A2a { .. } => "a2a",
2402        CollectedTriggerHandler::Worker { .. } => "worker",
2403        CollectedTriggerHandler::Persona { .. } => "persona",
2404    }
2405}
2406
2407fn trigger_kind_name(kind: crate::package::TriggerKind) -> &'static str {
2408    match kind {
2409        crate::package::TriggerKind::Webhook => "webhook",
2410        crate::package::TriggerKind::Cron => "cron",
2411        crate::package::TriggerKind::Poll => "poll",
2412        crate::package::TriggerKind::Stream => "stream",
2413        crate::package::TriggerKind::Predicate => "predicate",
2414        crate::package::TriggerKind::A2aPush => "a2a-push",
2415    }
2416}
2417
2418fn has_orchestrator_api_keys_configured() -> bool {
2419    std::env::var("HARN_ORCHESTRATOR_API_KEYS")
2420        .ok()
2421        .is_some_and(|value| value.split(',').any(|segment| !segment.trim().is_empty()))
2422}
2423
2424fn has_mcp_oauth_configured() -> bool {
2425    std::env::var("HARN_MCP_OAUTH_AUTHORIZATION_SERVERS")
2426        .ok()
2427        .is_some_and(|value| value.split(',').any(|segment| !segment.trim().is_empty()))
2428}
2429
2430fn validate_mcp_paths(
2431    path: &str,
2432    sse_path: &str,
2433    messages_path: &str,
2434) -> Result<(), OrchestratorError> {
2435    let reserved = [
2436        "/health",
2437        "/healthz",
2438        "/readyz",
2439        "/metrics",
2440        "/admin/reload",
2441        "/acp",
2442    ];
2443    let mut seen = BTreeSet::new();
2444    for (label, value) in [
2445        ("--mcp-path", path),
2446        ("--mcp-sse-path", sse_path),
2447        ("--mcp-messages-path", messages_path),
2448    ] {
2449        if !value.starts_with('/') {
2450            return Err(format!("{label} must start with '/'").into());
2451        }
2452        if value == "/" {
2453            return Err(format!("{label} cannot be '/'").into());
2454        }
2455        if reserved.contains(&value) {
2456            return Err(format!("{label} cannot use reserved listener path '{value}'").into());
2457        }
2458        if !seen.insert(value) {
2459            return Err(format!("embedded MCP paths must be unique; duplicate '{value}'").into());
2460        }
2461    }
2462    Ok(())
2463}
2464
2465async fn append_lifecycle_event(
2466    log: &Arc<AnyEventLog>,
2467    kind: &str,
2468    payload: JsonValue,
2469) -> Result<(), OrchestratorError> {
2470    let topic =
2471        harn_vm::event_log::Topic::new(LIFECYCLE_TOPIC).map_err(|error| error.to_string())?;
2472    log.append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
2473        .await
2474        .map(|_| ())
2475        .map_err(|error| {
2476            OrchestratorError::Serve(format!(
2477                "failed to append orchestrator lifecycle event: {error}"
2478            ))
2479        })
2480}
2481
2482async fn append_pump_lifecycle_event(
2483    log: &Arc<AnyEventLog>,
2484    kind: &str,
2485    payload: JsonValue,
2486) -> Result<(), OrchestratorError> {
2487    append_lifecycle_event(log, kind, payload).await
2488}
2489
2490async fn wait_for_pump_drain_release(
2491    log: &Arc<AnyEventLog>,
2492    topic: &harn_vm::event_log::Topic,
2493    event_id: u64,
2494    gate_rx: &mut watch::Receiver<bool>,
2495) -> Result<(), OrchestratorError> {
2496    if !*gate_rx.borrow() {
2497        return Ok(());
2498    }
2499    append_pump_lifecycle_event(
2500        log,
2501        "pump_drain_waiting",
2502        json!({
2503            "topic": topic.as_str(),
2504            "event_log_id": event_id,
2505        }),
2506    )
2507    .await?;
2508    while *gate_rx.borrow() {
2509        if gate_rx.changed().await.is_err() {
2510            break;
2511        }
2512    }
2513    Ok(())
2514}
2515
2516#[cfg_attr(not(unix), allow(dead_code))]
2517async fn append_manifest_event(
2518    log: &Arc<AnyEventLog>,
2519    kind: &str,
2520    payload: JsonValue,
2521) -> Result<(), OrchestratorError> {
2522    let topic =
2523        harn_vm::event_log::Topic::new(MANIFEST_TOPIC).map_err(|error| error.to_string())?;
2524    log.append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
2525        .await
2526        .map(|_| ())
2527        .map_err(|error| {
2528            OrchestratorError::Serve(format!(
2529                "failed to append orchestrator manifest event: {error}"
2530            ))
2531        })
2532}
2533
2534async fn record_pump_metrics(
2535    metrics: &harn_vm::MetricsRegistry,
2536    log: &Arc<AnyEventLog>,
2537    topic: &harn_vm::event_log::Topic,
2538    last_seen: u64,
2539    outstanding: usize,
2540) -> Result<(), OrchestratorError> {
2541    let latest = log.latest(topic).await.ok().flatten().unwrap_or(last_seen);
2542    let backlog = latest.saturating_sub(last_seen);
2543    metrics.set_orchestrator_pump_outstanding(topic.as_str(), outstanding);
2544    metrics.set_orchestrator_pump_backlog(topic.as_str(), backlog);
2545    append_pump_lifecycle_event(
2546        log,
2547        "pump_metrics_recorded",
2548        json!({
2549            "topic": topic.as_str(),
2550            "latest": latest,
2551            "last_seen": last_seen,
2552            "backlog": backlog,
2553            "outstanding": outstanding,
2554        }),
2555    )
2556    .await
2557}
2558
2559fn admission_delay(occurred_at_ms: i64) -> Duration {
2560    let now = std::time::SystemTime::now()
2561        .duration_since(std::time::UNIX_EPOCH)
2562        .unwrap_or_default()
2563        .as_millis() as i64;
2564    Duration::from_millis(now.saturating_sub(occurred_at_ms).max(0) as u64)
2565}
2566
2567async fn emit_drain_truncated(
2568    log: &Arc<AnyEventLog>,
2569    topic_name: &str,
2570    report: PumpDrainReport,
2571    config: DrainConfig,
2572) -> Result<(), OrchestratorError> {
2573    if !report.truncated() {
2574        return Ok(());
2575    }
2576    eprintln!(
2577        "[harn] warning: pump drain truncated for {topic_name}: remaining_queued={} drain_items={} reason={}",
2578        report.remaining_queued,
2579        report.drain_items,
2580        report.stop_reason.as_str()
2581    );
2582    append_lifecycle_event(
2583        log,
2584        "drain_truncated",
2585        json!({
2586            "topic": topic_name,
2587            "remaining_queued": report.remaining_queued,
2588            "drain_items": report.drain_items,
2589            "outstanding_tasks": report.outstanding_tasks,
2590            "max_items": config.max_items,
2591            "deadline_secs": config.deadline.as_secs(),
2592            "reason": report.stop_reason.as_str(),
2593        }),
2594    )
2595    .await
2596}
2597
2598async fn topic_latest_id(
2599    log: &Arc<AnyEventLog>,
2600    topic_name: &str,
2601) -> Result<u64, OrchestratorError> {
2602    let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
2603    log.latest(&topic)
2604        .await
2605        .map(|value| value.unwrap_or(0))
2606        .map_err(|error| {
2607            OrchestratorError::Serve(format!(
2608                "failed to read topic head for {topic_name}: {error}"
2609            ))
2610        })
2611}
2612
2613async fn drain_pump_best_effort(
2614    log: &Arc<AnyEventLog>,
2615    topic_name: &str,
2616    pump: PumpHandle,
2617    config: DrainConfig,
2618    overall_deadline: tokio::time::Instant,
2619) -> Result<PumpDrainReport, OrchestratorError> {
2620    let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
2621    let consumer = pump_consumer_id(&topic)?;
2622    let start_seen = log
2623        .consumer_cursor(&topic, &consumer)
2624        .await
2625        .map_err(|error| format!("failed to read consumer cursor for {topic_name}: {error}"))?
2626        .unwrap_or(0);
2627    let up_to = log
2628        .latest(&topic)
2629        .await
2630        .map_err(|error| format!("failed to read topic head for {topic_name}: {error}"))?
2631        .unwrap_or(0);
2632    let budget = remaining_budget(overall_deadline);
2633
2634    match tokio::time::timeout(
2635        budget,
2636        pump.drain(log, topic_name, up_to, config, overall_deadline),
2637    )
2638    .await
2639    {
2640        Ok(Ok(report)) => Ok(report),
2641        Ok(Err(error)) => {
2642            eprintln!("[harn] warning: pump drain error for {topic_name}: {error}");
2643            best_effort_pump_report(
2644                log,
2645                &topic,
2646                &consumer,
2647                start_seen,
2648                up_to,
2649                PumpDrainStopReason::Error,
2650            )
2651            .await
2652        }
2653        Err(_) => {
2654            eprintln!(
2655                "[harn] warning: pump drain timed out for {topic_name} after {:?}",
2656                budget
2657            );
2658            best_effort_pump_report(
2659                log,
2660                &topic,
2661                &consumer,
2662                start_seen,
2663                up_to,
2664                PumpDrainStopReason::Deadline,
2665            )
2666            .await
2667        }
2668    }
2669}
2670
2671async fn best_effort_pump_report(
2672    log: &Arc<AnyEventLog>,
2673    topic: &harn_vm::event_log::Topic,
2674    consumer: &ConsumerId,
2675    start_seen: u64,
2676    up_to: u64,
2677    stop_reason: PumpDrainStopReason,
2678) -> Result<PumpDrainReport, OrchestratorError> {
2679    let last_seen = log
2680        .consumer_cursor(topic, consumer)
2681        .await
2682        .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
2683        .unwrap_or(start_seen);
2684    let stats = PumpStats {
2685        last_seen,
2686        processed: last_seen.saturating_sub(start_seen),
2687    };
2688    Ok(pump_drain_report(stats, start_seen, up_to, 0, stop_reason))
2689}
2690
2691fn remaining_budget(deadline: tokio::time::Instant) -> Duration {
2692    deadline.saturating_duration_since(tokio::time::Instant::now())
2693}
2694
2695fn maybe_finish_pump_drain(
2696    stats: PumpStats,
2697    progress: PumpDrainProgress,
2698    outstanding_tasks: usize,
2699) -> Option<PumpDrainReport> {
2700    if stats.last_seen >= progress.request.up_to && outstanding_tasks == 0 {
2701        return Some(pump_drain_report(
2702            stats,
2703            progress.start_seen,
2704            progress.request.up_to,
2705            outstanding_tasks,
2706            PumpDrainStopReason::Drained,
2707        ));
2708    }
2709    if outstanding_tasks > 0 {
2710        if tokio::time::Instant::now() >= progress.request.deadline {
2711            return Some(pump_drain_report(
2712                stats,
2713                progress.start_seen,
2714                progress.request.up_to,
2715                outstanding_tasks,
2716                PumpDrainStopReason::Deadline,
2717            ));
2718        }
2719        return None;
2720    }
2721    let drain_items = stats.last_seen.saturating_sub(progress.start_seen);
2722    if drain_items >= progress.request.config.max_items as u64 {
2723        return Some(pump_drain_report(
2724            stats,
2725            progress.start_seen,
2726            progress.request.up_to,
2727            outstanding_tasks,
2728            PumpDrainStopReason::MaxItems,
2729        ));
2730    }
2731    if tokio::time::Instant::now() >= progress.request.deadline {
2732        return Some(pump_drain_report(
2733            stats,
2734            progress.start_seen,
2735            progress.request.up_to,
2736            outstanding_tasks,
2737            PumpDrainStopReason::Deadline,
2738        ));
2739    }
2740    None
2741}
2742
2743fn pump_drain_report(
2744    stats: PumpStats,
2745    start_seen: u64,
2746    up_to: u64,
2747    outstanding_tasks: usize,
2748    stop_reason: PumpDrainStopReason,
2749) -> PumpDrainReport {
2750    PumpDrainReport {
2751        stats,
2752        drain_items: stats.last_seen.saturating_sub(start_seen),
2753        remaining_queued: up_to.saturating_sub(stats.last_seen),
2754        outstanding_tasks,
2755        stop_reason,
2756    }
2757}
2758
2759fn pump_consumer_id(topic: &harn_vm::event_log::Topic) -> Result<ConsumerId, OrchestratorError> {
2760    ConsumerId::new(format!("orchestrator-pump.{}", topic.as_str())).map_err(|error| {
2761        OrchestratorError::Serve(format!("failed to create consumer id for {topic}: {error}"))
2762    })
2763}
2764
2765fn inbox_task_test_release_file() -> Option<PathBuf> {
2766    test_file_from_env(TEST_INBOX_TASK_RELEASE_FILE_ENV)
2767}
2768
2769fn test_file_from_env(key: &str) -> Option<PathBuf> {
2770    std::env::var_os(key)
2771        .filter(|value| !value.is_empty())
2772        .map(PathBuf::from)
2773}
2774
2775async fn wait_for_test_release_file(path: &Path) {
2776    while tokio::fs::metadata(path).await.is_err() {
2777        tokio::time::sleep(Duration::from_millis(10)).await;
2778    }
2779}
2780
2781fn pending_pump_test_should_fail() -> bool {
2782    std::env::var(TEST_FAIL_PENDING_PUMP_ENV)
2783        .ok()
2784        .is_some_and(|value| value != "0")
2785}
2786
2787fn spawn_manifest_watcher(
2788    config_path: PathBuf,
2789    reload: AdminReloadHandle,
2790) -> Result<notify::RecommendedWatcher, OrchestratorError> {
2791    use notify::{Event, EventKind, RecursiveMode, Watcher};
2792
2793    let watch_dir = config_path.parent().ok_or_else(|| {
2794        format!(
2795            "manifest has no parent directory: {}",
2796            config_path.display()
2797        )
2798    })?;
2799    let target_name = config_path
2800        .file_name()
2801        .and_then(|name| name.to_str())
2802        .ok_or_else(|| {
2803            format!(
2804                "manifest path is not valid UTF-8: {}",
2805                config_path.display()
2806            )
2807        })?
2808        .to_string();
2809    let (tx, mut rx) = mpsc::unbounded_channel::<()>();
2810    tokio::task::spawn_local(async move {
2811        while rx.recv().await.is_some() {
2812            tokio::time::sleep(Duration::from_millis(200)).await;
2813            while rx.try_recv().is_ok() {}
2814            let _ = reload.trigger("file_watch");
2815        }
2816    });
2817    let mut watcher =
2818        notify::recommended_watcher(move |res: Result<Event, notify::Error>| match res {
2819            Ok(event)
2820                if matches!(
2821                    event.kind,
2822                    EventKind::Modify(_)
2823                        | EventKind::Create(_)
2824                        | EventKind::Remove(_)
2825                        | EventKind::Any
2826                ) && event.paths.iter().any(|path| {
2827                    path.file_name()
2828                        .and_then(|name| name.to_str())
2829                        .is_some_and(|name| name == target_name)
2830                }) =>
2831            {
2832                let _ = tx.send(());
2833            }
2834            _ => {}
2835        })
2836        .map_err(|error| format!("failed to create manifest watcher: {error}"))?;
2837    watcher
2838        .watch(watch_dir, RecursiveMode::NonRecursive)
2839        .map_err(|error| {
2840            format!(
2841                "failed to watch manifest directory {}: {error}",
2842                watch_dir.display()
2843            )
2844        })?;
2845    Ok(watcher)
2846}
2847
2848pub(crate) fn load_manifest(config_path: &Path) -> Result<(Manifest, PathBuf), OrchestratorError> {
2849    if !config_path.is_file() {
2850        return Err(format!("manifest not found: {}", config_path.display()).into());
2851    }
2852    let content = std::fs::read_to_string(config_path)
2853        .map_err(|error| format!("failed to read {}: {error}", config_path.display()))?;
2854    let manifest = toml::from_str::<Manifest>(&content)
2855        .map_err(|error| format!("failed to parse {}: {error}", config_path.display()))?;
2856    let manifest_dir = config_path.parent().map(Path::to_path_buf).ok_or_else(|| {
2857        format!(
2858            "manifest has no parent directory: {}",
2859            config_path.display()
2860        )
2861    })?;
2862    Ok((manifest, manifest_dir))
2863}
2864
2865pub(crate) fn absolutize_from_cwd(path: &Path) -> Result<PathBuf, OrchestratorError> {
2866    let candidate = if path.is_absolute() {
2867        path.to_path_buf()
2868    } else {
2869        std::env::current_dir()
2870            .map_err(|error| format!("failed to read current directory: {error}"))?
2871            .join(path)
2872    };
2873    Ok(candidate)
2874}
2875
2876fn configured_secret_chain_display() -> String {
2877    std::env::var(harn_vm::secrets::SECRET_PROVIDER_CHAIN_ENV)
2878        .unwrap_or_else(|_| harn_vm::secrets::DEFAULT_SECRET_PROVIDER_CHAIN.to_string())
2879        .split(',')
2880        .map(str::trim)
2881        .filter(|segment| !segment.is_empty())
2882        .collect::<Vec<_>>()
2883        .join(" -> ")
2884}
2885
2886fn secret_namespace_for(manifest_dir: &Path) -> String {
2887    match std::env::var("HARN_SECRET_NAMESPACE") {
2888        Ok(namespace) if !namespace.trim().is_empty() => namespace,
2889        _ => {
2890            let leaf = manifest_dir
2891                .file_name()
2892                .and_then(|name| name.to_str())
2893                .filter(|name| !name.is_empty())
2894                .unwrap_or("workspace");
2895            format!("harn/{leaf}")
2896        }
2897    }
2898}
2899
2900fn now_rfc3339() -> Result<String, OrchestratorError> {
2901    OffsetDateTime::now_utc()
2902        .format(&Rfc3339)
2903        .map_err(|error| OrchestratorError::Serve(format!("failed to format timestamp: {error}")))
2904}
2905
2906fn write_state_snapshot(
2907    path: &Path,
2908    snapshot: &ServeStateSnapshot,
2909) -> Result<(), OrchestratorError> {
2910    let encoded = serde_json::to_vec_pretty(snapshot)
2911        .map_err(|error| format!("failed to encode orchestrator state snapshot: {error}"))?;
2912    if let Some(parent) = path.parent() {
2913        std::fs::create_dir_all(parent)
2914            .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
2915    }
2916    std::fs::write(path, encoded).map_err(|error| {
2917        OrchestratorError::Serve(format!("failed to write {}: {error}", path.display()))
2918    })
2919}
2920
2921// ── Snapshot types ────────────────────────────────────────────────────────────
2922
2923#[derive(Debug, Serialize)]
2924struct ServeStateSnapshot {
2925    status: String,
2926    role: String,
2927    bind: String,
2928    listener_url: String,
2929    manifest_path: String,
2930    state_dir: String,
2931    started_at: String,
2932    stopped_at: Option<String>,
2933    secret_provider_chain: String,
2934    event_log_backend: String,
2935    event_log_location: Option<String>,
2936    triggers: Vec<TriggerStateSnapshot>,
2937    connectors: Vec<String>,
2938    activations: Vec<ConnectorActivationSnapshot>,
2939}
2940
2941#[derive(Debug, Serialize)]
2942struct TriggerStateSnapshot {
2943    id: String,
2944    provider: String,
2945    kind: String,
2946    handler: String,
2947    version: Option<u32>,
2948    state: Option<String>,
2949    received: u64,
2950    dispatched: u64,
2951    failed: u64,
2952    in_flight: u64,
2953}
2954
2955#[derive(Debug, Serialize)]
2956struct ConnectorActivationSnapshot {
2957    provider: String,
2958    binding_count: usize,
2959}
2960
2961// ── Placeholder connector ─────────────────────────────────────────────────────
2962
2963struct PlaceholderConnector {
2964    provider_id: harn_vm::ProviderId,
2965    kinds: Vec<harn_vm::TriggerKind>,
2966    _ctx: Option<harn_vm::ConnectorCtx>,
2967}
2968
2969impl PlaceholderConnector {
2970    fn new(provider_id: harn_vm::ProviderId, kinds: BTreeSet<String>) -> Self {
2971        Self {
2972            provider_id,
2973            kinds: kinds.into_iter().map(harn_vm::TriggerKind::from).collect(),
2974            _ctx: None,
2975        }
2976    }
2977}
2978
2979struct PlaceholderClient;
2980
2981#[async_trait]
2982impl harn_vm::ConnectorClient for PlaceholderClient {
2983    async fn call(
2984        &self,
2985        method: &str,
2986        _args: JsonValue,
2987    ) -> Result<JsonValue, harn_vm::ClientError> {
2988        Err(harn_vm::ClientError::Other(format!(
2989            "connector client method '{method}' is not implemented in the orchestrator scaffold"
2990        )))
2991    }
2992}
2993
2994#[async_trait]
2995impl harn_vm::Connector for PlaceholderConnector {
2996    fn provider_id(&self) -> &harn_vm::ProviderId {
2997        &self.provider_id
2998    }
2999
3000    fn kinds(&self) -> &[harn_vm::TriggerKind] {
3001        &self.kinds
3002    }
3003
3004    async fn init(&mut self, ctx: harn_vm::ConnectorCtx) -> Result<(), harn_vm::ConnectorError> {
3005        self._ctx = Some(ctx);
3006        Ok(())
3007    }
3008
3009    async fn activate(
3010        &self,
3011        bindings: &[harn_vm::TriggerBinding],
3012    ) -> Result<harn_vm::ActivationHandle, harn_vm::ConnectorError> {
3013        Ok(harn_vm::ActivationHandle::new(
3014            self.provider_id.clone(),
3015            bindings.len(),
3016        ))
3017    }
3018
3019    async fn normalize_inbound(
3020        &self,
3021        _raw: harn_vm::RawInbound,
3022    ) -> Result<harn_vm::TriggerEvent, harn_vm::ConnectorError> {
3023        Err(harn_vm::ConnectorError::Unsupported(format!(
3024            "connector '{}' inbound normalization is not implemented yet",
3025            self.provider_id.as_str()
3026        )))
3027    }
3028
3029    fn payload_schema(&self) -> harn_vm::ProviderPayloadSchema {
3030        harn_vm::ProviderPayloadSchema::named("TriggerEvent")
3031    }
3032
3033    fn client(&self) -> Arc<dyn harn_vm::ConnectorClient> {
3034        Arc::new(PlaceholderClient)
3035    }
3036}
3037
3038#[cfg(test)]
3039mod tests {
3040    use super::*;
3041    use futures::StreamExt;
3042    use harn_vm::event_log::{EventLog, Topic};
3043
3044    fn write_test_file(dir: &Path, relative: &str, contents: &str) {
3045        let path = dir.join(relative);
3046        if let Some(parent) = path.parent() {
3047            std::fs::create_dir_all(parent).unwrap();
3048        }
3049        std::fs::write(path, contents).unwrap();
3050    }
3051
3052    fn stream_manifest_fixture() -> &'static str {
3053        r#"
3054[package]
3055name = "fixture"
3056
3057[exports]
3058handlers = "lib.harn"
3059
3060[[triggers]]
3061id = "ws-stream"
3062kind = "stream"
3063provider = "websocket"
3064path = "/streams/ws"
3065match = { events = ["quote.tick"] }
3066handler = "handlers::on_stream"
3067"#
3068    }
3069
3070    fn stream_handler_fixture(marker_path: &Path) -> String {
3071        format!(
3072            r#"
3073import "std/triggers"
3074
3075pub fn on_stream(event: TriggerEvent) {{
3076  write_file({marker:?}, json_stringify({{
3077    provider: event.provider,
3078    kind: event.kind,
3079    key: event.provider_payload.key,
3080    stream: event.provider_payload.stream,
3081    amount: event.provider_payload.raw.value.amount,
3082  }}))
3083}}
3084"#,
3085            marker = marker_path.display().to_string()
3086        )
3087    }
3088
3089    /// Proof test: `stream_trigger_route_uses_generic_stream_connector`
3090    /// migrated from subprocess + SQLite polling to in-process harness +
3091    /// `EventLog::subscribe()`.
3092    #[tokio::test(flavor = "multi_thread")]
3093    async fn stream_trigger_route_uses_generic_stream_connector_in_process() {
3094        // Env vars are process-global; hold the lock for the entire test so
3095        // concurrent unit tests that also set env vars don't race.
3096        let _env_lock = crate::tests::common::env_lock::lock_env().lock().await;
3097        let _secret_providers = crate::env_guard::ScopedEnvVar::set("HARN_SECRET_PROVIDERS", "env");
3098
3099        let temp = tempfile::TempDir::new().unwrap();
3100        let marker_path = temp.path().join("stream-handler.json");
3101        write_test_file(temp.path(), "harn.toml", stream_manifest_fixture());
3102        write_test_file(
3103            temp.path(),
3104            "lib.harn",
3105            &stream_handler_fixture(&marker_path),
3106        );
3107
3108        let config =
3109            OrchestratorConfig::for_test(temp.path().join("harn.toml"), temp.path().join("state"));
3110        let harness = OrchestratorHarness::start(config)
3111            .await
3112            .expect("harness start");
3113        let base_url = harness.listener_url().to_string();
3114        let event_log = harness.event_log();
3115
3116        let response = reqwest::Client::new()
3117            .post(format!("{base_url}/streams/ws"))
3118            .header("content-type", "application/json")
3119            .json(&serde_json::json!({
3120                "key": "acct-1",
3121                "stream": "quotes",
3122                "value": {"amount": 10}
3123            }))
3124            .send()
3125            .await
3126            .unwrap();
3127        assert_eq!(response.status(), 200);
3128
3129        // Event-driven wait: subscribe to orchestrator.lifecycle and block
3130        // until pump_dispatch_completed arrives, replacing SQLite polling.
3131        let topic = Topic::new("orchestrator.lifecycle").unwrap();
3132        let mut stream = event_log.clone().subscribe(&topic, None).await.unwrap();
3133        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30);
3134        loop {
3135            let remaining = deadline
3136                .checked_duration_since(tokio::time::Instant::now())
3137                .expect("timed out waiting for pump_dispatch_completed");
3138            let (_, event) = tokio::time::timeout(remaining, stream.next())
3139                .await
3140                .expect("timed out waiting for pump_dispatch_completed event")
3141                .expect("event stream ended unexpectedly")
3142                .expect("event stream error");
3143            if event.kind == "pump_dispatch_completed"
3144                && event.payload["status"] == serde_json::json!("completed")
3145            {
3146                break;
3147            }
3148        }
3149        drop(stream);
3150
3151        let marker: serde_json::Value =
3152            serde_json::from_str(&std::fs::read_to_string(&marker_path).unwrap()).unwrap();
3153        assert_eq!(
3154            marker.get("provider").and_then(|v| v.as_str()),
3155            Some("websocket")
3156        );
3157        assert_eq!(
3158            marker.get("kind").and_then(|v| v.as_str()),
3159            Some("quote.tick")
3160        );
3161        assert_eq!(marker.get("key").and_then(|v| v.as_str()), Some("acct-1"));
3162        assert_eq!(
3163            marker.get("stream").and_then(|v| v.as_str()),
3164            Some("quotes")
3165        );
3166        assert_eq!(marker.get("amount").and_then(|v| v.as_i64()), Some(10));
3167
3168        harness
3169            .shutdown(std::time::Duration::from_secs(5))
3170            .await
3171            .expect("harness shutdown");
3172    }
3173}