Skip to main content

harn_cli/commands/orchestrator/
harness.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::net::SocketAddr;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use serde::{Deserialize, Serialize};
10use serde_json::{json, Value as JsonValue};
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use tokio::sync::{mpsc, oneshot, watch};
14use tokio::task::JoinSet;
15
16use harn_vm::event_log::{AnyEventLog, ConsumerId, EventLog};
17
18use super::common::stranded_envelopes;
19use super::errors::OrchestratorError;
20use super::listener::{
21    AdminReloadHandle, AdminReloadRequest, ListenerConfig, ListenerRuntime, RouteConfig,
22    TriggerMetricSnapshot,
23};
24use super::origin_guard::OriginAllowList;
25use super::role::OrchestratorRole;
26use super::tls::TlsFiles;
27use crate::package::{
28    self, CollectedManifestTrigger, CollectedTriggerHandler, Manifest,
29    ResolvedProviderConnectorConfig, ResolvedProviderConnectorKind, ResolvedTriggerConfig,
30};
31
32const LIFECYCLE_TOPIC: &str = "orchestrator.lifecycle";
33#[cfg_attr(not(unix), allow(dead_code))]
34const MANIFEST_TOPIC: &str = "orchestrator.manifest";
35const STATE_SNAPSHOT_FILE: &str = "orchestrator-state.json";
36const PENDING_TOPIC: &str = "orchestrator.triggers.pending";
37const CRON_TICK_TOPIC: &str = "connectors.cron.tick";
38const TEST_INBOX_TASK_RELEASE_FILE_ENV: &str = "HARN_TEST_ORCHESTRATOR_INBOX_TASK_RELEASE_FILE";
39const TEST_FAIL_PENDING_PUMP_ENV: &str = "HARN_TEST_ORCHESTRATOR_FAIL_PENDING_PUMP";
40const WAITPOINT_SERVICE_INTERVAL: Duration = Duration::from_millis(250);
41
42// ── Public config ─────────────────────────────────────────────────────────────
43
44/// Drain limits applied during graceful shutdown.
45#[derive(Clone, Copy, Debug, PartialEq, Eq)]
46pub struct DrainConfig {
47    pub max_items: usize,
48    pub deadline: Duration,
49}
50
51impl Default for DrainConfig {
52    fn default() -> Self {
53        Self {
54            max_items: crate::package::default_orchestrator_drain_max_items(),
55            deadline: Duration::from_secs(
56                crate::package::default_orchestrator_drain_deadline_seconds(),
57            ),
58        }
59    }
60}
61
62/// Topic-pump concurrency limit.
63#[derive(Clone, Copy, Debug, PartialEq, Eq)]
64pub struct PumpConfig {
65    pub max_outstanding: usize,
66}
67
68impl Default for PumpConfig {
69    fn default() -> Self {
70        Self {
71            max_outstanding: crate::package::default_orchestrator_pump_max_outstanding(),
72        }
73    }
74}
75
76/// Configuration for `OrchestratorHarness::start`.
77#[derive(Clone, Debug)]
78pub struct OrchestratorConfig {
79    pub manifest_path: PathBuf,
80    pub state_dir: PathBuf,
81    pub bind: SocketAddr,
82    pub role: OrchestratorRole,
83    pub watch_manifest: bool,
84    pub mcp: bool,
85    pub mcp_path: String,
86    pub mcp_sse_path: String,
87    pub mcp_messages_path: String,
88    pub tls: Option<TlsFiles>,
89    pub shutdown_timeout: Duration,
90    pub drain: DrainConfig,
91    pub pump: PumpConfig,
92    /// When `Some`, installs an observability tracing subscriber.  Tests
93    /// should leave this `None` to avoid conflicts with the test runtime.
94    pub log_format: Option<harn_vm::observability::otel::LogFormat>,
95}
96
97impl OrchestratorConfig {
98    /// Minimal defaults suitable for integration tests.
99    pub fn for_test(manifest_path: PathBuf, state_dir: PathBuf) -> Self {
100        Self {
101            manifest_path,
102            state_dir,
103            bind: "127.0.0.1:0".parse().unwrap(),
104            role: OrchestratorRole::SingleTenant,
105            watch_manifest: false,
106            mcp: false,
107            mcp_path: "/mcp".to_string(),
108            mcp_sse_path: "/sse".to_string(),
109            mcp_messages_path: "/messages".to_string(),
110            tls: None,
111            shutdown_timeout: Duration::from_secs(5),
112            drain: DrainConfig::default(),
113            pump: PumpConfig::default(),
114            log_format: None,
115        }
116    }
117}
118
119// ── Public harness ────────────────────────────────────────────────────────────
120
121/// Summary returned by `OrchestratorHarness::shutdown`.
122#[derive(Debug)]
123pub struct ShutdownReport {
124    #[allow(dead_code)]
125    pub timed_out: bool,
126}
127
128/// Error type for harness operations.
129#[derive(Debug)]
130pub struct HarnessError(pub String);
131
132impl std::fmt::Display for HarnessError {
133    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134        f.write_str(&self.0)
135    }
136}
137
138impl From<OrchestratorError> for HarnessError {
139    fn from(error: OrchestratorError) -> Self {
140        HarnessError(error.to_string())
141    }
142}
143
144impl From<String> for HarnessError {
145    fn from(s: String) -> Self {
146        HarnessError(s)
147    }
148}
149
150/// In-process orchestrator runtime.
151///
152/// Starts all pumps, connectors, and the HTTP listener in a dedicated
153/// background thread with its own single-threaded Tokio runtime + LocalSet.
154/// Tests hold `Arc<AnyEventLog>` directly and use `EventLog::subscribe()` for
155/// event-driven waits instead of polling.
156#[allow(dead_code)]
157pub struct OrchestratorHarness {
158    event_log: Arc<AnyEventLog>,
159    listener_url: String,
160    local_addr: SocketAddr,
161    state_dir: PathBuf,
162    admin_reload: AdminReloadHandle,
163    shutdown_tx: Arc<watch::Sender<bool>>,
164    pump_drain_gate: PumpDrainGate,
165    join: Option<std::thread::JoinHandle<()>>,
166}
167
168struct ReadyState {
169    event_log: Arc<AnyEventLog>,
170    listener_url: String,
171    local_addr: SocketAddr,
172    state_dir: PathBuf,
173    admin_reload: AdminReloadHandle,
174}
175
176#[derive(Clone)]
177struct PumpDrainGate {
178    hold_tx: watch::Sender<bool>,
179}
180
181impl PumpDrainGate {
182    fn new() -> Self {
183        let (hold_tx, _) = watch::channel(false);
184        Self { hold_tx }
185    }
186
187    fn pause(&self) {
188        let _ = self.hold_tx.send(true);
189    }
190
191    fn release(&self) {
192        let _ = self.hold_tx.send(false);
193    }
194
195    fn subscribe(&self) -> watch::Receiver<bool> {
196        self.hold_tx.subscribe()
197    }
198}
199
200#[allow(dead_code)]
201impl OrchestratorHarness {
202    /// Start the orchestrator in-process.  Resolves once the HTTP listener
203    /// is ready and the startup lifecycle event has been appended.
204    pub async fn start(config: OrchestratorConfig) -> Result<Self, HarnessError> {
205        let (ready_tx, ready_rx) = oneshot::channel::<Result<ReadyState, OrchestratorError>>();
206        let (shutdown_tx, shutdown_rx) = watch::channel(false);
207        let shutdown_tx = Arc::new(shutdown_tx);
208        let pump_drain_gate = PumpDrainGate::new();
209        let task_pump_drain_gate = pump_drain_gate.clone();
210
211        let join = std::thread::spawn(move || {
212            // Use a multi-thread runtime so that blocking I/O (e.g. the OTEL
213            // SimpleSpanProcessor calling futures::executor::block_on for each
214            // span) can proceed on reactor threads while the LocalSet thread is
215            // temporarily paused.  A current-thread runtime would deadlock:
216            // block_on holds the only thread while the reactor needs that same
217            // thread to drive the TCP export.
218            let rt = tokio::runtime::Builder::new_multi_thread()
219                .worker_threads(2)
220                .enable_all()
221                .build()
222                .expect("failed to build OrchestratorHarness tokio runtime");
223            let local = tokio::task::LocalSet::new();
224            rt.block_on(local.run_until(orchestrator_task(
225                config,
226                ready_tx,
227                shutdown_rx,
228                task_pump_drain_gate,
229            )));
230        });
231
232        match ready_rx.await {
233            Ok(Ok(ready)) => Ok(Self {
234                event_log: ready.event_log,
235                listener_url: ready.listener_url,
236                local_addr: ready.local_addr,
237                state_dir: ready.state_dir,
238                admin_reload: ready.admin_reload,
239                shutdown_tx,
240                pump_drain_gate,
241                join: Some(join),
242            }),
243            Ok(Err(error)) => {
244                let _ = join.join();
245                Err(HarnessError::from(error))
246            }
247            Err(_) => {
248                let _ = join.join();
249                Err(HarnessError(
250                    "harness thread exited before signaling readiness".to_string(),
251                ))
252            }
253        }
254    }
255
256    pub fn listener_url(&self) -> &str {
257        &self.listener_url
258    }
259
260    pub fn local_addr(&self) -> SocketAddr {
261        self.local_addr
262    }
263
264    pub fn event_log(&self) -> Arc<AnyEventLog> {
265        self.event_log.clone()
266    }
267
268    pub fn state_dir(&self) -> &Path {
269        &self.state_dir
270    }
271
272    /// Returns a handle that can be used to trigger admin-reload from outside
273    /// the harness (e.g. on SIGHUP in the CLI wrapper).
274    pub fn admin_reload(&self) -> AdminReloadHandle {
275        self.admin_reload.clone()
276    }
277
278    /// Returns a sender that, when `send(true)` is called, triggers graceful
279    /// shutdown.  Use this to wire OS signal handlers in the CLI wrapper.
280    pub fn shutdown_trigger(&self) -> Arc<watch::Sender<bool>> {
281        self.shutdown_tx.clone()
282    }
283
284    /// Pause topic pumps at the next event admission. Tests can wait for
285    /// `pump_drain_waiting` before triggering shutdown.
286    pub fn pause_pump_drain(&self) {
287        self.pump_drain_gate.pause();
288    }
289
290    /// Release topic pumps paused by `pause_pump_drain`.
291    pub fn release_pump_drain(&self) {
292        self.pump_drain_gate.release();
293    }
294
295    /// Cancel-safe, idempotent.  Performs the same drain logic as SIGTERM.
296    pub async fn shutdown(mut self, _deadline: Duration) -> Result<ShutdownReport, HarnessError> {
297        // Signal the background runtime to start graceful shutdown.
298        let _ = self.shutdown_tx.send(true);
299        // Wait for the background thread to finish (graceful shutdown runs there).
300        let join = self.join.take().expect("join handle");
301        tokio::task::spawn_blocking(move || join.join())
302            .await
303            .map_err(|_| HarnessError("spawn_blocking join failed".to_string()))?
304            .map_err(|_| HarnessError("harness background thread panicked".to_string()))?;
305        Ok(ShutdownReport { timed_out: false })
306    }
307}
308
309impl Drop for OrchestratorHarness {
310    fn drop(&mut self) {
311        let _ = self.shutdown_tx.send(true);
312        if let Some(join) = self.join.take() {
313            let _ = join.join();
314        }
315    }
316}
317
318// ── Internal lifecycle ────────────────────────────────────────────────────────
319
320async fn orchestrator_task(
321    config: OrchestratorConfig,
322    ready_tx: oneshot::Sender<Result<ReadyState, OrchestratorError>>,
323    shutdown_rx: watch::Receiver<bool>,
324    pump_drain_gate: PumpDrainGate,
325) {
326    if let Err(error) = orchestrator_lifecycle(config, ready_tx, shutdown_rx, pump_drain_gate).await
327    {
328        eprintln!("[harn] orchestrator harness error: {error}");
329    }
330}
331
332async fn orchestrator_lifecycle(
333    config: OrchestratorConfig,
334    ready_tx: oneshot::Sender<Result<ReadyState, OrchestratorError>>,
335    mut shutdown_rx: watch::Receiver<bool>,
336    pump_drain_gate: PumpDrainGate,
337) -> Result<(), OrchestratorError> {
338    harn_vm::reset_thread_local_state();
339
340    let shutdown_timeout = config.shutdown_timeout;
341    let drain_config = config.drain;
342    let pump_config = PumpConfig {
343        max_outstanding: config.pump.max_outstanding.max(1),
344    };
345
346    let state_dir = config.state_dir.clone();
347    std::fs::create_dir_all(&state_dir).map_err(|error| {
348        format!(
349            "failed to create state dir {}: {error}",
350            state_dir.display()
351        )
352    })?;
353
354    let observability = if let Some(log_format) = config.log_format {
355        Some(
356            harn_vm::observability::otel::ObservabilityGuard::install_orchestrator_subscriber(
357                harn_vm::observability::otel::OrchestratorObservabilityConfig {
358                    log_format,
359                    state_dir: Some(state_dir.clone()),
360                },
361            )?,
362        )
363    } else {
364        None
365    };
366
367    let config_path = absolutize_from_cwd(&config.manifest_path)?;
368    let (manifest, manifest_dir) = load_manifest(&config_path)?;
369    let drain_config = DrainConfig {
370        max_items: drain_config.max_items.max(1),
371        deadline: drain_config.deadline,
372    };
373    let pump_config = PumpConfig {
374        max_outstanding: pump_config.max_outstanding.max(1),
375    };
376
377    let startup_started_at = now_rfc3339()?;
378    let (admin_reload, mut reload_rx) = AdminReloadHandle::channel();
379
380    eprintln!("[harn] orchestrator manifest: {}", config_path.display());
381    if let Some(name) = manifest
382        .package
383        .as_ref()
384        .and_then(|package| package.name.as_deref())
385    {
386        eprintln!("[harn] orchestrator package: {name}");
387    }
388    eprintln!(
389        "[harn] orchestrator role: {} ({})",
390        config.role.as_str(),
391        config.role.registry_mode()
392    );
393    eprintln!("[harn] orchestrator state dir: {}", state_dir.display());
394    tracing::info!(
395        component = "orchestrator",
396        trace_id = "",
397        role = config.role.as_str(),
398        state_dir = %state_dir.display(),
399        manifest = %config_path.display(),
400        "orchestrator starting"
401    );
402
403    let workspace_root = manifest_dir.clone();
404    let mut vm = config
405        .role
406        .build_vm(&workspace_root, &manifest_dir, &state_dir)?;
407
408    let event_log = harn_vm::event_log::active_event_log()
409        .ok_or_else(|| "event log was not installed during VM initialization".to_string())?;
410    let event_log_description = event_log.describe();
411    let tenant_store = if config.role == OrchestratorRole::MultiTenant {
412        let store = harn_vm::TenantStore::load(&state_dir)?;
413        let active_tenants = store
414            .list()
415            .into_iter()
416            .filter(|tenant| tenant.status == harn_vm::TenantStatus::Active)
417            .collect::<Vec<_>>();
418        eprintln!(
419            "[harn] tenants loaded: {} active ({})",
420            active_tenants.len(),
421            active_tenants
422                .iter()
423                .map(|tenant| tenant.scope.id.0.as_str())
424                .collect::<Vec<_>>()
425                .join(", ")
426        );
427        Some(Arc::new(store))
428    } else {
429        None
430    };
431    eprintln!(
432        "[harn] event log: {} {}",
433        event_log_description.backend,
434        event_log_description
435            .location
436            .as_ref()
437            .map(|path| path.display().to_string())
438            .unwrap_or_else(|| "<memory>".to_string())
439    );
440
441    let secret_namespace = secret_namespace_for(&manifest_dir);
442    let secret_chain_display = configured_secret_chain_display();
443    let secret_chain = harn_vm::secrets::configured_default_chain(secret_namespace.clone())
444        .map_err(|error| format!("failed to configure secret providers: {error}"))?;
445    if secret_chain.providers().is_empty() {
446        return Err("secret provider chain resolved to zero providers"
447            .to_string()
448            .into());
449    }
450    eprintln!(
451        "[harn] secret providers: {} (namespace {})",
452        secret_chain_display, secret_namespace
453    );
454    let secret_provider: Arc<dyn harn_vm::secrets::SecretProvider> = Arc::new(secret_chain);
455
456    let extensions = package::load_runtime_extensions(&config_path);
457    let metrics_registry = Arc::new(harn_vm::MetricsRegistry::default());
458    harn_vm::install_active_metrics_registry(metrics_registry.clone());
459    let collected_triggers = package::collect_manifest_triggers(&mut vm, &extensions)
460        .await
461        .map_err(|error| format!("failed to collect manifest triggers: {error}"))?;
462    package::install_collected_manifest_triggers(&collected_triggers).await?;
463    eprintln!(
464        "[harn] registered triggers ({}): {}",
465        collected_triggers.len(),
466        format_trigger_summary(&collected_triggers)
467    );
468
469    let binding_versions = live_manifest_binding_versions();
470    let route_configs = build_route_configs(&collected_triggers, &binding_versions)?;
471    let mut connector_runtime = initialize_connectors(
472        &collected_triggers,
473        event_log.clone(),
474        secret_provider.clone(),
475        metrics_registry.clone(),
476        &extensions.provider_connectors,
477    )
478    .await?;
479    let route_configs = attach_route_connectors(
480        route_configs,
481        &connector_runtime.registry,
482        &extensions.provider_connectors,
483    )?;
484    let connector_clients = connector_runtime.registry.client_map().await;
485    harn_vm::install_active_connector_clients(connector_clients);
486    eprintln!(
487        "[harn] registered connectors ({}): {}",
488        connector_runtime.providers.len(),
489        connector_runtime.providers.join(", ")
490    );
491    eprintln!(
492        "[harn] activated connectors: {}",
493        format_activation_summary(&connector_runtime.activations)
494    );
495    let (mcp_router, mcp_service) = if config.mcp {
496        validate_mcp_paths(
497            &config.mcp_path,
498            &config.mcp_sse_path,
499            &config.mcp_messages_path,
500        )?;
501        if !has_orchestrator_api_keys_configured() && !has_mcp_oauth_configured() {
502            return Err(OrchestratorError::Serve(
503                "--mcp requires HARN_ORCHESTRATOR_API_KEYS or HARN_MCP_OAUTH_AUTHORIZATION_SERVERS so the embedded MCP management surface is authenticated"
504                    .to_string(),
505            ));
506        }
507        let service = Arc::new(
508            crate::commands::mcp::serve::McpOrchestratorService::new_local(
509                crate::cli::OrchestratorLocalArgs {
510                    config: config_path.clone(),
511                    state_dir: state_dir.clone(),
512                },
513            )?,
514        );
515        let router = crate::commands::mcp::serve::http_router_for_service(
516            service.clone(),
517            config.mcp_path.clone(),
518            config.mcp_sse_path.clone(),
519            config.mcp_messages_path.clone(),
520        );
521        eprintln!(
522            "[harn] embedded MCP server mounted at {} (legacy SSE {}, messages {})",
523            config.mcp_path, config.mcp_sse_path, config.mcp_messages_path
524        );
525        (Some(router), Some(service))
526    } else {
527        (None, None)
528    };
529
530    let dispatcher = harn_vm::Dispatcher::with_event_log_and_metrics(
531        vm,
532        event_log.clone(),
533        Some(metrics_registry.clone()),
534    );
535    let mut pending_pumps = vec![(
536        PENDING_TOPIC.to_string(),
537        spawn_pending_pump(
538            event_log.clone(),
539            dispatcher.clone(),
540            pump_config,
541            metrics_registry.clone(),
542            pump_drain_gate.clone(),
543            PENDING_TOPIC,
544        )?,
545    )];
546    let mut inbox_pumps = vec![(
547        harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC.to_string(),
548        spawn_inbox_pump(
549            event_log.clone(),
550            dispatcher.clone(),
551            pump_config,
552            metrics_registry.clone(),
553            harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC,
554        )?,
555    )];
556    if let Some(store) = tenant_store.as_ref() {
557        for tenant in store
558            .list()
559            .into_iter()
560            .filter(|tenant| tenant.status == harn_vm::TenantStatus::Active)
561        {
562            let pending_topic = harn_vm::tenant_topic(
563                &tenant.scope.id,
564                &harn_vm::event_log::Topic::new(PENDING_TOPIC)
565                    .map_err(|error| error.to_string())?,
566            )
567            .map_err(|error| error.to_string())?;
568            pending_pumps.push((
569                pending_topic.as_str().to_string(),
570                spawn_pending_pump(
571                    event_log.clone(),
572                    dispatcher.clone(),
573                    pump_config,
574                    metrics_registry.clone(),
575                    pump_drain_gate.clone(),
576                    pending_topic.as_str(),
577                )?,
578            ));
579            let inbox_topic = harn_vm::tenant_topic(
580                &tenant.scope.id,
581                &harn_vm::event_log::Topic::new(harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC)
582                    .map_err(|error| error.to_string())?,
583            )
584            .map_err(|error| error.to_string())?;
585            inbox_pumps.push((
586                inbox_topic.as_str().to_string(),
587                spawn_inbox_pump(
588                    event_log.clone(),
589                    dispatcher.clone(),
590                    pump_config,
591                    metrics_registry.clone(),
592                    inbox_topic.as_str(),
593                )?,
594            ));
595        }
596    }
597    let cron_pump = spawn_cron_pump(
598        event_log.clone(),
599        dispatcher.clone(),
600        pump_config,
601        metrics_registry.clone(),
602        pump_drain_gate.clone(),
603    )?;
604    let waitpoint_pump = spawn_waitpoint_resume_pump(
605        event_log.clone(),
606        dispatcher.clone(),
607        pump_config,
608        metrics_registry.clone(),
609        pump_drain_gate.clone(),
610    )?;
611    let waitpoint_cancel_pump = spawn_waitpoint_cancel_pump(
612        event_log.clone(),
613        dispatcher.clone(),
614        pump_config,
615        metrics_registry.clone(),
616        pump_drain_gate.clone(),
617    )?;
618    let waitpoint_sweeper = spawn_waitpoint_sweeper(dispatcher.clone());
619
620    let listener = ListenerRuntime::start(ListenerConfig {
621        bind: config.bind,
622        tls: config.tls.clone(),
623        event_log: event_log.clone(),
624        secrets: secret_provider.clone(),
625        allowed_origins: OriginAllowList::from_manifest(&manifest.orchestrator.allowed_origins),
626        max_body_bytes: ListenerConfig::max_body_bytes_or_default(
627            manifest.orchestrator.max_body_bytes,
628        ),
629        metrics_registry: metrics_registry.clone(),
630        admin_reload: Some(admin_reload.clone()),
631        mcp_router,
632        routes: route_configs,
633        tenant_store: tenant_store.clone(),
634        session_store: Some(Arc::new(harn_vm::SessionStore::new(event_log.clone()))),
635    })
636    .await?;
637    let local_bind = listener.local_addr();
638    let listener_metrics = listener.trigger_metrics();
639    let mut live_manifest = manifest;
640    let mut live_triggers = collected_triggers;
641    let _manifest_watcher = if config.watch_manifest {
642        Some(spawn_manifest_watcher(
643            config_path.clone(),
644            admin_reload.clone(),
645        )?)
646    } else {
647        None
648    };
649    connector_runtime.activations = connector_runtime
650        .registry
651        .activate_all(&connector_runtime.trigger_registry)
652        .await
653        .map_err(|error| error.to_string())?;
654    eprintln!(
655        "[harn] activated connectors: {}",
656        format_activation_summary(&connector_runtime.activations)
657    );
658
659    listener.mark_ready();
660    eprintln!("[harn] HTTP listener ready on {}", listener.url());
661    tracing::info!(
662        component = "orchestrator",
663        trace_id = "",
664        listener_url = %listener.url(),
665        "HTTP listener ready"
666    );
667
668    write_state_snapshot(
669        &state_dir.join(STATE_SNAPSHOT_FILE),
670        &ServeStateSnapshot {
671            status: "running".to_string(),
672            role: config.role.as_str().to_string(),
673            bind: local_bind.to_string(),
674            listener_url: listener.url(),
675            manifest_path: config_path.display().to_string(),
676            state_dir: state_dir.display().to_string(),
677            started_at: startup_started_at.clone(),
678            stopped_at: None,
679            secret_provider_chain: secret_chain_display.clone(),
680            event_log_backend: event_log_description.backend.to_string(),
681            event_log_location: event_log_description
682                .location
683                .as_ref()
684                .map(|path| path.display().to_string()),
685            triggers: trigger_state_snapshots(&live_triggers, &listener_metrics),
686            connectors: connector_runtime.providers.clone(),
687            activations: connector_runtime
688                .activations
689                .iter()
690                .map(|activation| ConnectorActivationSnapshot {
691                    provider: activation.provider.as_str().to_string(),
692                    binding_count: activation.binding_count,
693                })
694                .collect(),
695        },
696    )?;
697
698    append_lifecycle_event(
699        &event_log,
700        "startup",
701        json!({
702            "bind": local_bind.to_string(),
703            "manifest": config_path.display().to_string(),
704            "role": config.role.as_str(),
705            "state_dir": state_dir.display().to_string(),
706            "trigger_count": live_triggers.len(),
707            "connector_count": connector_runtime.providers.len(),
708            "tls_enabled": listener.scheme() == "https",
709            "shutdown_timeout_secs": shutdown_timeout.as_secs(),
710            "drain_max_items": drain_config.max_items,
711            "drain_deadline_secs": drain_config.deadline.as_secs(),
712            "pump_max_outstanding": pump_config.max_outstanding,
713        }),
714    )
715    .await?;
716
717    let stranded = stranded_envelopes(&event_log, Duration::ZERO).await?;
718    if !stranded.is_empty() {
719        eprintln!(
720            "[harn] startup found {} stranded inbox envelope(s); inspect with `harn orchestrator queue` and recover explicitly with `harn orchestrator recover --dry-run --envelope-age ...`",
721            stranded.len()
722        );
723    }
724    append_lifecycle_event(
725        &event_log,
726        "startup_stranded_envelopes",
727        json!({
728            "count": stranded.len(),
729        }),
730    )
731    .await?;
732
733    // Signal that the harness is ready.
734    let _ = ready_tx.send(Ok(ReadyState {
735        event_log: event_log.clone(),
736        listener_url: listener.url(),
737        local_addr: local_bind,
738        state_dir: state_dir.clone(),
739        admin_reload: admin_reload.clone(),
740    }));
741
742    // Wait for shutdown trigger or admin reload requests.
743    let mut ctx = RuntimeCtx {
744        role: config.role,
745        config_path: &config_path,
746        state_dir: &state_dir,
747        bind: local_bind,
748        startup_started_at: &startup_started_at,
749        event_log: &event_log,
750        event_log_description: &event_log_description,
751        secret_chain_display: &secret_chain_display,
752        listener: &listener,
753        connectors: &mut connector_runtime,
754        live_manifest: &mut live_manifest,
755        live_triggers: &mut live_triggers,
756        secret_provider: &secret_provider,
757        metrics_registry: &metrics_registry,
758        mcp_service: mcp_service.as_ref(),
759        reload_rx: &mut reload_rx,
760    };
761
762    loop {
763        tokio::select! {
764            changed = shutdown_rx.changed() => {
765                if changed.is_err() || *shutdown_rx.borrow() {
766                    break;
767                }
768            }
769            Some(request) = ctx.reload_rx.recv() => {
770                handle_reload_request(&mut ctx, request).await?;
771            }
772        }
773    }
774
775    listener.mark_not_ready();
776    let shutdown = graceful_shutdown(
777        GracefulShutdownCtx {
778            role: config.role,
779            bind: local_bind,
780            listener_url: listener.url(),
781            config_path: &config_path,
782            state_dir: &state_dir,
783            startup_started_at: &startup_started_at,
784            event_log: &event_log,
785            event_log_description: &event_log_description,
786            secret_chain_display: &secret_chain_display,
787            triggers: &live_triggers,
788            connectors: &connector_runtime,
789            shutdown_timeout,
790            drain_config,
791        },
792        listener,
793        dispatcher,
794        pending_pumps,
795        cron_pump,
796        inbox_pumps,
797        waitpoint_pump,
798        waitpoint_cancel_pump,
799        waitpoint_sweeper,
800    )
801    .await;
802
803    if let Some(obs) = observability {
804        if let Err(error) = obs.shutdown() {
805            if shutdown.is_ok() {
806                return Err(OrchestratorError::Serve(error));
807            }
808            eprintln!("[harn] observability shutdown warning: {error}");
809        }
810    }
811    harn_vm::clear_active_metrics_registry();
812    shutdown
813}
814
815// ── Internal types ────────────────────────────────────────────────────────────
816
817struct ConnectorRuntime {
818    registry: harn_vm::ConnectorRegistry,
819    trigger_registry: harn_vm::TriggerRegistry,
820    handles: Vec<harn_vm::connectors::ConnectorHandle>,
821    providers: Vec<String>,
822    activations: Vec<harn_vm::ActivationHandle>,
823    #[cfg_attr(not(unix), allow(dead_code))]
824    provider_overrides: Vec<ResolvedProviderConnectorConfig>,
825}
826
827#[cfg_attr(not(unix), allow(dead_code))]
828#[derive(Clone, Debug, Default, Serialize)]
829struct ManifestReloadSummary {
830    added: Vec<String>,
831    modified: Vec<String>,
832    removed: Vec<String>,
833    unchanged: Vec<String>,
834}
835
836#[derive(Clone, Copy, Debug, PartialEq, Eq)]
837enum PumpMode {
838    Running,
839    Draining(PumpDrainRequest),
840}
841
842#[derive(Clone, Copy, Debug, PartialEq, Eq)]
843struct PumpDrainRequest {
844    up_to: u64,
845    config: DrainConfig,
846    deadline: tokio::time::Instant,
847}
848
849#[derive(Clone, Copy, Debug, PartialEq, Eq)]
850enum PumpDrainStopReason {
851    Drained,
852    MaxItems,
853    Deadline,
854    Error,
855}
856
857impl PumpDrainStopReason {
858    fn as_str(self) -> &'static str {
859        match self {
860            Self::Drained => "drained",
861            Self::MaxItems => "max_items",
862            Self::Deadline => "deadline",
863            Self::Error => "error",
864        }
865    }
866}
867
868#[derive(Clone, Copy, Debug, Default)]
869struct PumpStats {
870    last_seen: u64,
871    processed: u64,
872}
873
874#[derive(Clone, Copy, Debug)]
875struct PumpDrainProgress {
876    request: PumpDrainRequest,
877    start_seen: u64,
878}
879
880#[derive(Clone, Copy, Debug)]
881struct PumpDrainReport {
882    stats: PumpStats,
883    drain_items: u64,
884    remaining_queued: u64,
885    outstanding_tasks: usize,
886    stop_reason: PumpDrainStopReason,
887}
888
889impl PumpDrainReport {
890    fn truncated(self) -> bool {
891        self.remaining_queued > 0 || self.outstanding_tasks > 0
892    }
893}
894
895struct PumpHandle {
896    mode_tx: watch::Sender<PumpMode>,
897    join: tokio::task::JoinHandle<Result<PumpDrainReport, OrchestratorError>>,
898}
899
900impl PumpHandle {
901    async fn drain(
902        self,
903        log: &Arc<AnyEventLog>,
904        topic_name: &str,
905        up_to: u64,
906        config: DrainConfig,
907        overall_deadline: tokio::time::Instant,
908    ) -> Result<PumpDrainReport, OrchestratorError> {
909        let drain_deadline = std::cmp::min(
910            tokio::time::Instant::now() + config.deadline,
911            overall_deadline,
912        );
913        let _ = self.mode_tx.send(PumpMode::Draining(PumpDrainRequest {
914            up_to,
915            config,
916            deadline: drain_deadline,
917        }));
918        append_pump_lifecycle_event(
919            log,
920            "pump_drain_started",
921            json!({
922                "topic": topic_name,
923                "up_to": up_to,
924                "max_items": config.max_items,
925                "drain_deadline_ms": config.deadline.as_millis(),
926            }),
927        )
928        .await?;
929        match self.join.await {
930            Ok(result) => result,
931            Err(error) => Err(format!("pump task join failed: {error}").into()),
932        }
933    }
934}
935
936struct WaitpointSweepHandle {
937    stop_tx: watch::Sender<bool>,
938    join: tokio::task::JoinHandle<Result<(), OrchestratorError>>,
939}
940
941impl WaitpointSweepHandle {
942    async fn shutdown(self) -> Result<(), OrchestratorError> {
943        let _ = self.stop_tx.send(true);
944        match self.join.await {
945            Ok(result) => result,
946            Err(error) => Err(format!("waitpoint sweeper join failed: {error}").into()),
947        }
948    }
949}
950
951#[derive(Debug, Deserialize)]
952struct PendingTriggerRecord {
953    trigger_id: String,
954    binding_version: u32,
955    event: harn_vm::TriggerEvent,
956}
957
958// ── Pump spawn functions ──────────────────────────────────────────────────────
959
960fn spawn_pending_pump(
961    event_log: Arc<harn_vm::event_log::AnyEventLog>,
962    dispatcher: harn_vm::Dispatcher,
963    pump_config: PumpConfig,
964    metrics_registry: Arc<harn_vm::MetricsRegistry>,
965    pump_drain_gate: PumpDrainGate,
966    topic_name: &str,
967) -> Result<PumpHandle, OrchestratorError> {
968    let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
969    spawn_topic_pump(
970        event_log,
971        topic,
972        pump_config,
973        metrics_registry,
974        pump_drain_gate,
975        move |logged| {
976            let dispatcher = dispatcher.clone();
977            async move {
978                if pending_pump_test_should_fail() {
979                    return Err("test pending pump failure".to_string().into());
980                }
981                if logged.kind != "trigger_event" {
982                    return Ok(false);
983                }
984                let record: PendingTriggerRecord = serde_json::from_value(logged.payload)
985                    .map_err(|error| format!("failed to decode pending trigger event: {error}"))?;
986                dispatcher
987                    .enqueue_targeted_with_headers(
988                        Some(record.trigger_id),
989                        Some(record.binding_version),
990                        record.event,
991                        Some(&logged.headers),
992                    )
993                    .await
994                    .map_err(|error| format!("failed to enqueue pending trigger event: {error}"))?;
995                Ok(true)
996            }
997        },
998    )
999}
1000
1001fn spawn_cron_pump(
1002    event_log: Arc<harn_vm::event_log::AnyEventLog>,
1003    dispatcher: harn_vm::Dispatcher,
1004    pump_config: PumpConfig,
1005    metrics_registry: Arc<harn_vm::MetricsRegistry>,
1006    pump_drain_gate: PumpDrainGate,
1007) -> Result<PumpHandle, OrchestratorError> {
1008    let topic =
1009        harn_vm::event_log::Topic::new(CRON_TICK_TOPIC).map_err(|error| error.to_string())?;
1010    spawn_topic_pump(
1011        event_log,
1012        topic,
1013        pump_config,
1014        metrics_registry,
1015        pump_drain_gate,
1016        move |logged| {
1017            let dispatcher = dispatcher.clone();
1018            async move {
1019                if logged.kind != "trigger_event" {
1020                    return Ok(false);
1021                }
1022                let event: harn_vm::TriggerEvent = serde_json::from_value(logged.payload)
1023                    .map_err(|error| format!("failed to decode cron trigger event: {error}"))?;
1024                let trigger_id = match &event.provider_payload {
1025                    harn_vm::ProviderPayload::Known(
1026                        harn_vm::triggers::event::KnownProviderPayload::Cron(payload),
1027                    ) => payload.cron_id.clone(),
1028                    _ => None,
1029                };
1030                dispatcher
1031                    .enqueue_targeted_with_headers(trigger_id, None, event, Some(&logged.headers))
1032                    .await
1033                    .map_err(|error| format!("failed to enqueue cron trigger event: {error}"))?;
1034                Ok(true)
1035            }
1036        },
1037    )
1038}
1039
1040fn spawn_inbox_pump(
1041    event_log: Arc<harn_vm::event_log::AnyEventLog>,
1042    dispatcher: harn_vm::Dispatcher,
1043    pump_config: PumpConfig,
1044    metrics_registry: Arc<harn_vm::MetricsRegistry>,
1045    topic_name: &str,
1046) -> Result<PumpHandle, OrchestratorError> {
1047    let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
1048    let consumer = pump_consumer_id(&topic)?;
1049    let inbox_task_release_file = inbox_task_test_release_file();
1050    let (mode_tx, mut mode_rx) = watch::channel(PumpMode::Running);
1051    let join = tokio::task::spawn_local(async move {
1052        let start_from = event_log
1053            .consumer_cursor(&topic, &consumer)
1054            .await
1055            .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
1056            .or(event_log
1057                .latest(&topic)
1058                .await
1059                .map_err(|error| format!("failed to read topic head {topic}: {error}"))?);
1060        let mut stream = event_log
1061            .clone()
1062            .subscribe(&topic, start_from)
1063            .await
1064            .map_err(|error| format!("failed to subscribe topic {topic}: {error}"))?;
1065        let mut stats = PumpStats {
1066            last_seen: start_from.unwrap_or(0),
1067            processed: 0,
1068        };
1069        record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1070        let mut drain_progress = None;
1071        let mut tasks = JoinSet::new();
1072
1073        loop {
1074            if let Some(progress) = drain_progress {
1075                if let Some(report) = maybe_finish_pump_drain(stats, progress, tasks.len()) {
1076                    return Ok(report);
1077                }
1078            }
1079
1080            let deadline = drain_progress.map(|progress| progress.request.deadline);
1081            let mut deadline_wait = Box::pin(async move {
1082                if let Some(deadline) = deadline {
1083                    tokio::time::sleep_until(deadline).await;
1084                } else {
1085                    std::future::pending::<()>().await;
1086                }
1087            });
1088
1089            tokio::select! {
1090                changed = mode_rx.changed() => {
1091                    if changed.is_err() {
1092                        break;
1093                    }
1094                    if let PumpMode::Draining(request) = *mode_rx.borrow() {
1095                        drain_progress.get_or_insert(PumpDrainProgress {
1096                            request,
1097                            start_seen: stats.last_seen,
1098                        });
1099                    }
1100                }
1101                _ = &mut deadline_wait => {
1102                    if let Some(progress) = drain_progress {
1103                        return Ok(pump_drain_report(
1104                            stats,
1105                            progress.start_seen,
1106                            progress.request.up_to,
1107                            tasks.len(),
1108                            PumpDrainStopReason::Deadline,
1109                        ));
1110                    }
1111                }
1112                joined = tasks.join_next(), if !tasks.is_empty() => {
1113                    match joined {
1114                        Some(Ok(())) => {
1115                            record_pump_metrics(
1116                                &metrics_registry,
1117                                &event_log,
1118                                &topic,
1119                                stats.last_seen,
1120                                tasks.len(),
1121                            )
1122                            .await?;
1123                        }
1124                        Some(Err(error)) => {
1125                            return Err(format!("inbox dispatch task join failed: {error}").into());
1126                        }
1127                        None => {}
1128                    }
1129                }
1130                _ = tokio::time::sleep(Duration::from_millis(25)), if tasks.len() >= pump_config.max_outstanding => {
1131                    record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1132                }
1133                received = stream.next(), if tasks.len() < pump_config.max_outstanding => {
1134                    let Some(received) = received else {
1135                        break;
1136                    };
1137                    let (event_id, logged) = received
1138                        .map_err(|error| format!("topic pump read failed for {topic}: {error}"))?;
1139                    if logged.kind != "event_ingested" {
1140                        stats.last_seen = event_id;
1141                        event_log
1142                            .ack(&topic, &consumer, event_id)
1143                            .await
1144                            .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1145                        record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1146                        continue;
1147                    }
1148                    append_pump_lifecycle_event(
1149                        &event_log,
1150                        "pump_received",
1151                        json!({
1152                            "topic": topic.as_str(),
1153                            "event_log_id": event_id,
1154                            "outstanding": tasks.len(),
1155                            "max_outstanding": pump_config.max_outstanding,
1156                        }),
1157                    )
1158                    .await?;
1159                    let envelope: harn_vm::triggers::dispatcher::InboxEnvelope =
1160                        serde_json::from_value(logged.payload)
1161                            .map_err(|error| format!("failed to decode dispatcher inbox event: {error}"))?;
1162                    let trigger_id = envelope.trigger_id.clone();
1163                    let binding_version = envelope.binding_version;
1164                    let trigger_event_id = envelope.event.id.0.clone();
1165                    let parent_headers = logged.headers.clone();
1166                    append_pump_lifecycle_event(
1167                        &event_log,
1168                        "pump_eligible",
1169                        json!({
1170                            "topic": topic.as_str(),
1171                            "event_log_id": event_id,
1172                            "trigger_id": trigger_id.clone(),
1173                            "binding_version": binding_version,
1174                            "trigger_event_id": trigger_event_id,
1175                        }),
1176                    )
1177                    .await?;
1178                    metrics_registry.record_orchestrator_pump_admission_delay(
1179                        topic.as_str(),
1180                        admission_delay(logged.occurred_at_ms),
1181                    );
1182                    append_pump_lifecycle_event(
1183                        &event_log,
1184                        "pump_admitted",
1185                        json!({
1186                            "topic": topic.as_str(),
1187                            "event_log_id": event_id,
1188                            "outstanding_after_admit": tasks.len() + 1,
1189                            "max_outstanding": pump_config.max_outstanding,
1190                            "trigger_id": trigger_id.clone(),
1191                            "binding_version": binding_version,
1192                            "trigger_event_id": trigger_event_id,
1193                        }),
1194                    )
1195                    .await?;
1196                    let dispatcher = dispatcher.clone();
1197                    let task_event_log = event_log.clone();
1198                    let task_topic = topic.as_str().to_string();
1199                    let inbox_task_release_file = inbox_task_release_file.clone();
1200                    tasks.spawn_local(async move {
1201                        if let Some(path) = inbox_task_release_file.as_ref() {
1202                            wait_for_test_release_file(path).await;
1203                        }
1204                        let _ = append_pump_lifecycle_event(
1205                            &task_event_log,
1206                            "pump_dispatch_started",
1207                            json!({
1208                                "topic": task_topic.clone(),
1209                                "event_log_id": event_id,
1210                                "trigger_id": trigger_id,
1211                                "binding_version": binding_version,
1212                                "trigger_event_id": trigger_event_id,
1213                            }),
1214                        )
1215                        .await;
1216                        let result = dispatcher
1217                            .dispatch_inbox_envelope_with_parent_headers(
1218                                envelope,
1219                                &parent_headers,
1220                            )
1221                            .await;
1222                        let (status, error_message) = match result {
1223                            Ok(_) => ("completed", None),
1224                            Err(error) => {
1225                                let message = error.to_string();
1226                                eprintln!("[harn] inbox dispatch warning: {message}");
1227                                ("failed", Some(message))
1228                            }
1229                        };
1230                        let _ = append_pump_lifecycle_event(
1231                            &task_event_log,
1232                            "pump_dispatch_completed",
1233                            json!({
1234                                "topic": task_topic,
1235                                "event_log_id": event_id,
1236                                "status": status,
1237                                "error": error_message,
1238                            }),
1239                        )
1240                        .await;
1241                    });
1242                    stats.last_seen = event_id;
1243                    stats.processed += 1;
1244                    event_log
1245                        .ack(&topic, &consumer, event_id)
1246                        .await
1247                        .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1248                    append_pump_lifecycle_event(
1249                        &event_log,
1250                        "pump_acked",
1251                        json!({
1252                            "topic": topic.as_str(),
1253                            "event_log_id": event_id,
1254                            "cursor": event_id,
1255                        }),
1256                    )
1257                    .await?;
1258                    record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1259                }
1260            }
1261        }
1262
1263        while let Some(joined) = tasks.join_next().await {
1264            joined.map_err(|error| format!("inbox dispatch task join failed: {error}"))?;
1265            record_pump_metrics(
1266                &metrics_registry,
1267                &event_log,
1268                &topic,
1269                stats.last_seen,
1270                tasks.len(),
1271            )
1272            .await?;
1273        }
1274
1275        Ok(drain_progress
1276            .map(|progress| {
1277                pump_drain_report(
1278                    stats,
1279                    progress.start_seen,
1280                    progress.request.up_to,
1281                    0,
1282                    PumpDrainStopReason::Drained,
1283                )
1284            })
1285            .unwrap_or_else(|| PumpDrainReport {
1286                stats,
1287                drain_items: 0,
1288                remaining_queued: 0,
1289                outstanding_tasks: 0,
1290                stop_reason: PumpDrainStopReason::Drained,
1291            }))
1292    });
1293    Ok(PumpHandle { mode_tx, join })
1294}
1295
1296fn spawn_waitpoint_resume_pump(
1297    event_log: Arc<harn_vm::event_log::AnyEventLog>,
1298    dispatcher: harn_vm::Dispatcher,
1299    pump_config: PumpConfig,
1300    metrics_registry: Arc<harn_vm::MetricsRegistry>,
1301    pump_drain_gate: PumpDrainGate,
1302) -> Result<PumpHandle, OrchestratorError> {
1303    let topic = harn_vm::event_log::Topic::new(harn_vm::WAITPOINT_RESUME_TOPIC)
1304        .map_err(|error| error.to_string())?;
1305    spawn_topic_pump(
1306        event_log,
1307        topic,
1308        pump_config,
1309        metrics_registry,
1310        pump_drain_gate,
1311        move |logged| {
1312            let dispatcher = dispatcher.clone();
1313            async move {
1314                harn_vm::process_waitpoint_resume_event(&dispatcher, logged)
1315                    .await
1316                    .map_err(OrchestratorError::from)
1317            }
1318        },
1319    )
1320}
1321
1322fn spawn_waitpoint_cancel_pump(
1323    event_log: Arc<harn_vm::event_log::AnyEventLog>,
1324    dispatcher: harn_vm::Dispatcher,
1325    pump_config: PumpConfig,
1326    metrics_registry: Arc<harn_vm::MetricsRegistry>,
1327    pump_drain_gate: PumpDrainGate,
1328) -> Result<PumpHandle, OrchestratorError> {
1329    let topic = harn_vm::event_log::Topic::new(harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC)
1330        .map_err(|error| error.to_string())?;
1331    spawn_topic_pump(
1332        event_log,
1333        topic,
1334        pump_config,
1335        metrics_registry,
1336        pump_drain_gate,
1337        move |logged| {
1338            let dispatcher = dispatcher.clone();
1339            async move {
1340                if logged.kind != "dispatch_cancel_requested" {
1341                    return Ok(false);
1342                }
1343                harn_vm::service_waitpoints_once(&dispatcher, None)
1344                    .await
1345                    .map_err(|error| {
1346                        OrchestratorError::Serve(format!(
1347                            "failed to service waitpoints on cancel: {error}"
1348                        ))
1349                    })?;
1350                Ok(true)
1351            }
1352        },
1353    )
1354}
1355
1356fn spawn_waitpoint_sweeper(dispatcher: harn_vm::Dispatcher) -> WaitpointSweepHandle {
1357    let (stop_tx, mut stop_rx) = watch::channel(false);
1358    let join = tokio::task::spawn_local(async move {
1359        let mut interval = tokio::time::interval(WAITPOINT_SERVICE_INTERVAL);
1360        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1361        loop {
1362            tokio::select! {
1363                changed = stop_rx.changed() => {
1364                    if changed.is_err() || *stop_rx.borrow() {
1365                        break;
1366                    }
1367                }
1368                _ = interval.tick() => {
1369                    harn_vm::service_waitpoints_once(&dispatcher, None)
1370                        .await
1371                        .map_err(|error| format!("failed to service waitpoints on sweep: {error}"))?;
1372                }
1373            }
1374        }
1375        Ok(())
1376    });
1377    WaitpointSweepHandle { stop_tx, join }
1378}
1379
1380fn spawn_topic_pump<F, Fut>(
1381    event_log: Arc<harn_vm::event_log::AnyEventLog>,
1382    topic: harn_vm::event_log::Topic,
1383    _pump_config: PumpConfig,
1384    metrics_registry: Arc<harn_vm::MetricsRegistry>,
1385    pump_drain_gate: PumpDrainGate,
1386    process: F,
1387) -> Result<PumpHandle, OrchestratorError>
1388where
1389    F: Fn(harn_vm::event_log::LogEvent) -> Fut + 'static,
1390    Fut: std::future::Future<Output = Result<bool, OrchestratorError>> + 'static,
1391{
1392    let consumer = pump_consumer_id(&topic)?;
1393    let mut pump_drain_gate_rx = pump_drain_gate.subscribe();
1394    let (mode_tx, mut mode_rx) = watch::channel(PumpMode::Running);
1395    let join = tokio::task::spawn_local(async move {
1396        let start_from = event_log
1397            .consumer_cursor(&topic, &consumer)
1398            .await
1399            .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
1400            .or(event_log
1401                .latest(&topic)
1402                .await
1403                .map_err(|error| format!("failed to read topic head {topic}: {error}"))?);
1404        let mut stream = event_log
1405            .clone()
1406            .subscribe(&topic, start_from)
1407            .await
1408            .map_err(|error| format!("failed to subscribe topic {topic}: {error}"))?;
1409        let mut stats = PumpStats {
1410            last_seen: start_from.unwrap_or(0),
1411            processed: 0,
1412        };
1413        record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1414        let mut drain_progress = None;
1415        loop {
1416            if let Some(progress) = drain_progress {
1417                if let Some(report) = maybe_finish_pump_drain(stats, progress, 0) {
1418                    return Ok(report);
1419                }
1420            }
1421            let deadline = drain_progress.map(|progress| progress.request.deadline);
1422            let mut deadline_wait = Box::pin(async move {
1423                if let Some(deadline) = deadline {
1424                    tokio::time::sleep_until(deadline).await;
1425                } else {
1426                    std::future::pending::<()>().await;
1427                }
1428            });
1429            tokio::select! {
1430                changed = mode_rx.changed() => {
1431                    if changed.is_err() {
1432                        break;
1433                    }
1434                    if let PumpMode::Draining(request) = *mode_rx.borrow() {
1435                        drain_progress.get_or_insert(PumpDrainProgress {
1436                            request,
1437                            start_seen: stats.last_seen,
1438                        });
1439                    }
1440                }
1441                _ = &mut deadline_wait => {
1442                    if let Some(progress) = drain_progress {
1443                        return Ok(pump_drain_report(
1444                            stats,
1445                            progress.start_seen,
1446                            progress.request.up_to,
1447                            0,
1448                            PumpDrainStopReason::Deadline,
1449                        ));
1450                    }
1451                }
1452                received = stream.next() => {
1453                    let Some(received) = received else {
1454                        break;
1455                    };
1456                    let (event_id, logged) = received
1457                        .map_err(|error| format!("topic pump read failed for {topic}: {error}"))?;
1458                    record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 1).await?;
1459                    metrics_registry.record_orchestrator_pump_admission_delay(
1460                        topic.as_str(),
1461                        admission_delay(logged.occurred_at_ms),
1462                    );
1463                    wait_for_pump_drain_release(
1464                        &event_log,
1465                        &topic,
1466                        event_id,
1467                        &mut pump_drain_gate_rx,
1468                    )
1469                    .await?;
1470                    let handled = process(logged).await?;
1471                    stats.last_seen = event_id;
1472                    if handled {
1473                        stats.processed += 1;
1474                    }
1475                    event_log
1476                        .ack(&topic, &consumer, event_id)
1477                        .await
1478                        .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1479                    record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1480                }
1481            }
1482        }
1483        Ok(drain_progress
1484            .map(|progress| {
1485                pump_drain_report(
1486                    stats,
1487                    progress.start_seen,
1488                    progress.request.up_to,
1489                    0,
1490                    PumpDrainStopReason::Drained,
1491                )
1492            })
1493            .unwrap_or_else(|| PumpDrainReport {
1494                stats,
1495                drain_items: 0,
1496                remaining_queued: 0,
1497                outstanding_tasks: 0,
1498                stop_reason: PumpDrainStopReason::Drained,
1499            }))
1500    });
1501    Ok(PumpHandle { mode_tx, join })
1502}
1503
1504// ── Graceful shutdown ─────────────────────────────────────────────────────────
1505
1506#[allow(clippy::too_many_arguments)]
1507async fn graceful_shutdown(
1508    ctx: GracefulShutdownCtx<'_>,
1509    listener: ListenerRuntime,
1510    dispatcher: harn_vm::Dispatcher,
1511    pending_pumps: Vec<(String, PumpHandle)>,
1512    cron_pump: PumpHandle,
1513    inbox_pumps: Vec<(String, PumpHandle)>,
1514    waitpoint_pump: PumpHandle,
1515    waitpoint_cancel_pump: PumpHandle,
1516    waitpoint_sweeper: WaitpointSweepHandle,
1517) -> Result<(), OrchestratorError> {
1518    eprintln!("[harn] signal received, starting graceful shutdown...");
1519    tracing::info!(
1520        component = "orchestrator",
1521        trace_id = "",
1522        shutdown_timeout_secs = ctx.shutdown_timeout.as_secs(),
1523        "signal received, starting graceful shutdown"
1524    );
1525    let listener_in_flight = listener
1526        .trigger_metrics()
1527        .into_values()
1528        .map(|metrics| metrics.in_flight)
1529        .sum::<u64>();
1530    let dispatcher_before = dispatcher.snapshot();
1531    append_lifecycle_event(
1532        ctx.event_log,
1533        "draining",
1534        json!({
1535            "bind": ctx.bind.to_string(),
1536            "role": ctx.role.as_str(),
1537            "status": "draining",
1538            "http_in_flight": listener_in_flight,
1539            "dispatcher_in_flight": dispatcher_before.in_flight,
1540            "dispatcher_retry_queue_depth": dispatcher_before.retry_queue_depth,
1541            "dispatcher_dlq_depth": dispatcher_before.dlq_depth,
1542            "shutdown_timeout_secs": ctx.shutdown_timeout.as_secs(),
1543            "drain_max_items": ctx.drain_config.max_items,
1544            "drain_deadline_secs": ctx.drain_config.deadline.as_secs(),
1545        }),
1546    )
1547    .await?;
1548
1549    let deadline = tokio::time::Instant::now() + ctx.shutdown_timeout;
1550    let listener_metrics = listener.shutdown(remaining_budget(deadline)).await?;
1551    for handle in &ctx.connectors.handles {
1552        let connector = handle.lock().await;
1553        if let Err(error) = connector.shutdown(remaining_budget(deadline)).await {
1554            eprintln!(
1555                "[harn] connector {} shutdown warning: {error}",
1556                connector.provider_id().as_str()
1557            );
1558        }
1559    }
1560
1561    let mut pending_processed = 0;
1562    for (topic_name, pump) in pending_pumps {
1563        let stats =
1564            drain_pump_best_effort(ctx.event_log, &topic_name, pump, ctx.drain_config, deadline)
1565                .await?;
1566        pending_processed += stats.stats.processed;
1567        emit_drain_truncated(ctx.event_log, &topic_name, stats, ctx.drain_config).await?;
1568    }
1569    let cron_stats = drain_pump_best_effort(
1570        ctx.event_log,
1571        CRON_TICK_TOPIC,
1572        cron_pump,
1573        ctx.drain_config,
1574        deadline,
1575    )
1576    .await?;
1577    emit_drain_truncated(ctx.event_log, CRON_TICK_TOPIC, cron_stats, ctx.drain_config).await?;
1578    let mut inbox_processed = 0;
1579    for (topic_name, pump) in inbox_pumps {
1580        let stats =
1581            drain_pump_best_effort(ctx.event_log, &topic_name, pump, ctx.drain_config, deadline)
1582                .await?;
1583        inbox_processed += stats.stats.processed;
1584        emit_drain_truncated(ctx.event_log, &topic_name, stats, ctx.drain_config).await?;
1585    }
1586    let waitpoint_stats = waitpoint_pump
1587        .drain(
1588            ctx.event_log,
1589            harn_vm::WAITPOINT_RESUME_TOPIC,
1590            topic_latest_id(ctx.event_log, harn_vm::WAITPOINT_RESUME_TOPIC).await?,
1591            ctx.drain_config,
1592            deadline,
1593        )
1594        .await?;
1595    emit_drain_truncated(
1596        ctx.event_log,
1597        harn_vm::WAITPOINT_RESUME_TOPIC,
1598        waitpoint_stats,
1599        ctx.drain_config,
1600    )
1601    .await?;
1602    let waitpoint_cancel_stats = waitpoint_cancel_pump
1603        .drain(
1604            ctx.event_log,
1605            harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC,
1606            topic_latest_id(ctx.event_log, harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC).await?,
1607            ctx.drain_config,
1608            deadline,
1609        )
1610        .await?;
1611    emit_drain_truncated(
1612        ctx.event_log,
1613        harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC,
1614        waitpoint_cancel_stats,
1615        ctx.drain_config,
1616    )
1617    .await?;
1618    waitpoint_sweeper.shutdown().await?;
1619    let drain_report = dispatcher
1620        .drain(remaining_budget(deadline))
1621        .await
1622        .map_err(|error| format!("failed to drain dispatcher: {error}"))?;
1623
1624    let stopped_at = now_rfc3339()?;
1625    let timed_out = !drain_report.drained;
1626    if timed_out {
1627        dispatcher.shutdown();
1628    }
1629    append_lifecycle_event(
1630        ctx.event_log,
1631        "stopped",
1632        json!({
1633            "bind": ctx.bind.to_string(),
1634            "role": ctx.role.as_str(),
1635            "status": "stopped",
1636            "http_in_flight": listener_in_flight,
1637            "dispatcher_in_flight": drain_report.in_flight,
1638            "dispatcher_retry_queue_depth": drain_report.retry_queue_depth,
1639            "dispatcher_dlq_depth": drain_report.dlq_depth,
1640            "pending_events_drained": pending_processed,
1641            "cron_events_drained": cron_stats.stats.processed,
1642            "inbox_events_drained": inbox_processed,
1643            "waitpoint_events_drained": waitpoint_stats.stats.processed,
1644            "waitpoint_cancel_events_drained": waitpoint_cancel_stats.stats.processed,
1645            "timed_out": timed_out,
1646        }),
1647    )
1648    .await?;
1649    ctx.event_log
1650        .flush()
1651        .await
1652        .map_err(|error| format!("failed to flush event log: {error}"))?;
1653
1654    write_state_snapshot(
1655        &ctx.state_dir.join(STATE_SNAPSHOT_FILE),
1656        &ServeStateSnapshot {
1657            status: "stopped".to_string(),
1658            role: ctx.role.as_str().to_string(),
1659            bind: ctx.bind.to_string(),
1660            listener_url: ctx.listener_url.clone(),
1661            manifest_path: ctx.config_path.display().to_string(),
1662            state_dir: ctx.state_dir.display().to_string(),
1663            started_at: ctx.startup_started_at.to_string(),
1664            stopped_at: Some(stopped_at),
1665            secret_provider_chain: ctx.secret_chain_display.to_string(),
1666            event_log_backend: ctx.event_log_description.backend.to_string(),
1667            event_log_location: ctx
1668                .event_log_description
1669                .location
1670                .as_ref()
1671                .map(|path| path.display().to_string()),
1672            triggers: trigger_state_snapshots(ctx.triggers, &listener_metrics),
1673            connectors: ctx.connectors.providers.clone(),
1674            activations: ctx
1675                .connectors
1676                .activations
1677                .iter()
1678                .map(|activation| ConnectorActivationSnapshot {
1679                    provider: activation.provider.as_str().to_string(),
1680                    binding_count: activation.binding_count,
1681                })
1682                .collect(),
1683        },
1684    )?;
1685
1686    if timed_out {
1687        eprintln!(
1688            "[harn] graceful shutdown timed out with {} dispatches and {} retry waits remaining",
1689            drain_report.in_flight, drain_report.retry_queue_depth
1690        );
1691    }
1692    eprintln!("[harn] graceful shutdown complete");
1693    tracing::info!(
1694        component = "orchestrator",
1695        trace_id = "",
1696        "graceful shutdown complete"
1697    );
1698    Ok(())
1699}
1700
1701struct GracefulShutdownCtx<'a> {
1702    role: OrchestratorRole,
1703    bind: SocketAddr,
1704    listener_url: String,
1705    config_path: &'a Path,
1706    state_dir: &'a Path,
1707    startup_started_at: &'a str,
1708    event_log: &'a Arc<AnyEventLog>,
1709    event_log_description: &'a harn_vm::event_log::EventLogDescription,
1710    secret_chain_display: &'a str,
1711    triggers: &'a [CollectedManifestTrigger],
1712    connectors: &'a ConnectorRuntime,
1713    shutdown_timeout: Duration,
1714    drain_config: DrainConfig,
1715}
1716
1717// ── Manifest reload ───────────────────────────────────────────────────────────
1718
1719struct RuntimeCtx<'a> {
1720    role: OrchestratorRole,
1721    config_path: &'a Path,
1722    state_dir: &'a Path,
1723    bind: SocketAddr,
1724    startup_started_at: &'a str,
1725    event_log: &'a Arc<AnyEventLog>,
1726    event_log_description: &'a harn_vm::event_log::EventLogDescription,
1727    secret_chain_display: &'a str,
1728    listener: &'a ListenerRuntime,
1729    connectors: &'a mut ConnectorRuntime,
1730    live_manifest: &'a mut Manifest,
1731    live_triggers: &'a mut Vec<CollectedManifestTrigger>,
1732    secret_provider: &'a Arc<dyn harn_vm::secrets::SecretProvider>,
1733    metrics_registry: &'a Arc<harn_vm::MetricsRegistry>,
1734    mcp_service: Option<&'a Arc<crate::commands::mcp::serve::McpOrchestratorService>>,
1735    #[cfg_attr(not(unix), allow(dead_code))]
1736    reload_rx: &'a mut mpsc::UnboundedReceiver<AdminReloadRequest>,
1737}
1738
1739#[cfg_attr(not(unix), allow(dead_code))]
1740async fn handle_reload_request(
1741    ctx: &mut RuntimeCtx<'_>,
1742    request: AdminReloadRequest,
1743) -> Result<(), OrchestratorError> {
1744    let source = request.source.clone();
1745    match reload_manifest(ctx).await {
1746        Ok(summary) => {
1747            if let Some(mcp_service) = ctx.mcp_service {
1748                mcp_service.notify_manifest_reloaded();
1749            }
1750            write_running_state_snapshot(ctx)?;
1751            append_manifest_event(
1752                ctx.event_log,
1753                "reload_succeeded",
1754                json!({
1755                    "source": source,
1756                    "summary": summary,
1757                }),
1758            )
1759            .await?;
1760            eprintln!(
1761                "[harn] manifest reload ({source}) applied: +{} ~{} -{}",
1762                summary.added.len(),
1763                summary.modified.len(),
1764                summary.removed.len()
1765            );
1766            if let Some(response_tx) = request.response_tx {
1767                let _ = response_tx.send(serde_json::to_value(&summary).map_err(|error| {
1768                    OrchestratorError::Serve(format!("failed to encode reload summary: {error}"))
1769                }));
1770            }
1771        }
1772        Err(error) => {
1773            eprintln!("[harn] manifest reload ({source}) failed: {error}");
1774            append_manifest_event(
1775                ctx.event_log,
1776                "reload_failed",
1777                json!({
1778                    "source": source,
1779                    "error": error.to_string(),
1780                }),
1781            )
1782            .await?;
1783            if let Some(response_tx) = request.response_tx {
1784                let _ = response_tx.send(Err(error));
1785            }
1786        }
1787    }
1788    Ok(())
1789}
1790
1791#[cfg_attr(not(unix), allow(dead_code))]
1792fn write_running_state_snapshot(ctx: &RuntimeCtx<'_>) -> Result<(), OrchestratorError> {
1793    let listener_metrics = ctx.listener.trigger_metrics();
1794    write_state_snapshot(
1795        &ctx.state_dir.join(STATE_SNAPSHOT_FILE),
1796        &ServeStateSnapshot {
1797            status: "running".to_string(),
1798            role: ctx.role.as_str().to_string(),
1799            bind: ctx.bind.to_string(),
1800            listener_url: ctx.listener.url(),
1801            manifest_path: ctx.config_path.display().to_string(),
1802            state_dir: ctx.state_dir.display().to_string(),
1803            started_at: ctx.startup_started_at.to_string(),
1804            stopped_at: None,
1805            secret_provider_chain: ctx.secret_chain_display.to_string(),
1806            event_log_backend: ctx.event_log_description.backend.to_string(),
1807            event_log_location: ctx
1808                .event_log_description
1809                .location
1810                .as_ref()
1811                .map(|path| path.display().to_string()),
1812            triggers: trigger_state_snapshots(ctx.live_triggers, &listener_metrics),
1813            connectors: ctx.connectors.providers.clone(),
1814            activations: ctx
1815                .connectors
1816                .activations
1817                .iter()
1818                .map(|activation| ConnectorActivationSnapshot {
1819                    provider: activation.provider.as_str().to_string(),
1820                    binding_count: activation.binding_count,
1821                })
1822                .collect(),
1823        },
1824    )
1825}
1826
1827#[cfg_attr(not(unix), allow(dead_code))]
1828async fn reload_manifest(
1829    ctx: &mut RuntimeCtx<'_>,
1830) -> Result<ManifestReloadSummary, OrchestratorError> {
1831    let (manifest, manifest_dir) = load_manifest(ctx.config_path)?;
1832    let mut vm = ctx
1833        .role
1834        .build_vm(&manifest_dir, &manifest_dir, ctx.state_dir)?;
1835    let extensions = package::load_runtime_extensions(ctx.config_path);
1836    let collected_triggers = package::collect_manifest_triggers(&mut vm, &extensions)
1837        .await
1838        .map_err(|error| format!("failed to collect manifest triggers: {error}"))?;
1839    let summary = summarize_manifest_reload(ctx.live_triggers, &collected_triggers);
1840    let connector_reload =
1841        connector_reload_fingerprint_map(ctx.live_triggers, &ctx.connectors.provider_overrides)
1842            != connector_reload_fingerprint_map(
1843                &collected_triggers,
1844                &extensions.provider_connectors,
1845            );
1846    let next_connector_runtime = if connector_reload {
1847        let mut runtime = initialize_connectors(
1848            &collected_triggers,
1849            ctx.event_log.clone(),
1850            ctx.secret_provider.clone(),
1851            ctx.metrics_registry.clone(),
1852            &extensions.provider_connectors,
1853        )
1854        .await?;
1855        runtime.activations = runtime
1856            .registry
1857            .activate_all(&runtime.trigger_registry)
1858            .await
1859            .map_err(|error| error.to_string())?;
1860        Some(runtime)
1861    } else {
1862        None
1863    };
1864    let previous_manifest = ctx.live_manifest.clone();
1865    let previous_triggers = ctx.live_triggers.clone();
1866    package::install_collected_manifest_triggers(&collected_triggers).await?;
1867    let binding_versions = live_manifest_binding_versions();
1868    let route_registry = next_connector_runtime
1869        .as_ref()
1870        .map(|runtime| &runtime.registry)
1871        .unwrap_or(&ctx.connectors.registry);
1872    let route_overrides = next_connector_runtime
1873        .as_ref()
1874        .map(|runtime| runtime.provider_overrides.as_slice())
1875        .unwrap_or(ctx.connectors.provider_overrides.as_slice());
1876    let route_configs = match build_route_configs(&collected_triggers, &binding_versions)
1877        .and_then(|routes| attach_route_connectors(routes, route_registry, route_overrides))
1878    {
1879        Ok(routes) => routes,
1880        Err(error) => {
1881            rollback_manifest_reload(ctx, &previous_manifest, &previous_triggers)
1882                .await
1883                .map_err(|rollback| format!("{error}; rollback failed: {rollback}"))?;
1884            return Err(error);
1885        }
1886    };
1887    if let Err(error) = ctx.listener.reload_routes(route_configs) {
1888        rollback_manifest_reload(ctx, &previous_manifest, &previous_triggers)
1889            .await
1890            .map_err(|rollback| format!("{error}; rollback failed: {rollback}"))?;
1891        return Err(error);
1892    }
1893    if let Some(runtime) = next_connector_runtime {
1894        let previous_handles = ctx.connectors.handles.clone();
1895        let connector_clients = runtime.registry.client_map().await;
1896        harn_vm::install_active_connector_clients(connector_clients);
1897        *ctx.connectors = runtime;
1898        for handle in previous_handles {
1899            let connector = handle.lock().await;
1900            if let Err(error) = connector.shutdown(Duration::from_secs(5)).await {
1901                eprintln!(
1902                    "[harn] connector {} reload shutdown warning: {error}",
1903                    connector.provider_id().as_str()
1904                );
1905            }
1906        }
1907    }
1908    *ctx.live_manifest = manifest;
1909    *ctx.live_triggers = collected_triggers;
1910    Ok(summary)
1911}
1912
1913#[cfg_attr(not(unix), allow(dead_code))]
1914async fn rollback_manifest_reload(
1915    ctx: &mut RuntimeCtx<'_>,
1916    previous_manifest: &Manifest,
1917    previous_triggers: &[CollectedManifestTrigger],
1918) -> Result<(), OrchestratorError> {
1919    package::install_collected_manifest_triggers(previous_triggers).await?;
1920    let binding_versions = live_manifest_binding_versions();
1921    let route_configs = build_route_configs(previous_triggers, &binding_versions)?;
1922    let route_configs = attach_route_connectors(
1923        route_configs,
1924        &ctx.connectors.registry,
1925        &ctx.connectors.provider_overrides,
1926    )?;
1927    ctx.listener.reload_routes(route_configs)?;
1928    *ctx.live_manifest = previous_manifest.clone();
1929    *ctx.live_triggers = previous_triggers.to_vec();
1930    Ok(())
1931}
1932
1933#[cfg_attr(not(unix), allow(dead_code))]
1934fn summarize_manifest_reload(
1935    current: &[CollectedManifestTrigger],
1936    next: &[CollectedManifestTrigger],
1937) -> ManifestReloadSummary {
1938    let current_map = trigger_fingerprint_map(current, true);
1939    let next_map = trigger_fingerprint_map(next, true);
1940    let mut summary = ManifestReloadSummary::default();
1941    let ids: BTreeSet<String> = current_map.keys().chain(next_map.keys()).cloned().collect();
1942    for id in ids {
1943        match (current_map.get(&id), next_map.get(&id)) {
1944            (None, Some(_)) => summary.added.push(id),
1945            (Some(_), None) => summary.removed.push(id),
1946            (Some(left), Some(right)) if left == right => summary.unchanged.push(id),
1947            (Some(_), Some(_)) => summary.modified.push(id),
1948            (None, None) => {}
1949        }
1950    }
1951    summary
1952}
1953
1954#[cfg_attr(not(unix), allow(dead_code))]
1955fn trigger_fingerprint_map(
1956    triggers: &[CollectedManifestTrigger],
1957    include_http_managed: bool,
1958) -> BTreeMap<String, String> {
1959    triggers
1960        .iter()
1961        .filter(|trigger| include_http_managed || !is_http_managed_trigger(trigger))
1962        .map(|trigger| {
1963            let spec = package::manifest_trigger_binding_spec(trigger.clone());
1964            (trigger.config.id.clone(), spec.definition_fingerprint)
1965        })
1966        .collect()
1967}
1968
1969#[cfg_attr(not(unix), allow(dead_code))]
1970fn connector_reload_fingerprint_map(
1971    triggers: &[CollectedManifestTrigger],
1972    provider_overrides: &[ResolvedProviderConnectorConfig],
1973) -> BTreeMap<String, Vec<String>> {
1974    let mut by_provider = BTreeMap::<String, Vec<String>>::new();
1975    for trigger in triggers {
1976        let provider = trigger.config.provider.as_str().to_string();
1977        if !connector_owns_ingress(&provider, provider_overrides)
1978            && matches!(
1979                trigger.config.kind,
1980                crate::package::TriggerKind::Webhook | crate::package::TriggerKind::A2aPush
1981            )
1982        {
1983            continue;
1984        }
1985        let spec = package::manifest_trigger_binding_spec(trigger.clone());
1986        by_provider
1987            .entry(provider)
1988            .or_default()
1989            .push(spec.definition_fingerprint);
1990    }
1991    for override_config in provider_overrides {
1992        by_provider
1993            .entry(override_config.id.as_str().to_string())
1994            .or_default()
1995            .push(provider_connector_fingerprint(override_config));
1996    }
1997    for fingerprints in by_provider.values_mut() {
1998        fingerprints.sort();
1999    }
2000    by_provider
2001}
2002
2003#[cfg_attr(not(unix), allow(dead_code))]
2004fn provider_connector_fingerprint(config: &ResolvedProviderConnectorConfig) -> String {
2005    match &config.connector {
2006        ResolvedProviderConnectorKind::RustBuiltin => format!(
2007            "{}::builtin@{}",
2008            config.id.as_str(),
2009            config.manifest_dir.display()
2010        ),
2011        ResolvedProviderConnectorKind::Harn { module } => format!(
2012            "{}::harn:{}@{}",
2013            config.id.as_str(),
2014            module,
2015            config.manifest_dir.display()
2016        ),
2017        ResolvedProviderConnectorKind::Invalid(message) => format!(
2018            "{}::invalid:{}@{}",
2019            config.id.as_str(),
2020            message,
2021            config.manifest_dir.display()
2022        ),
2023    }
2024}
2025
2026#[cfg_attr(not(unix), allow(dead_code))]
2027fn is_http_managed_trigger(trigger: &CollectedManifestTrigger) -> bool {
2028    matches!(
2029        trigger.config.kind,
2030        crate::package::TriggerKind::Webhook | crate::package::TriggerKind::A2aPush
2031    )
2032}
2033
2034// ── Connector helpers ─────────────────────────────────────────────────────────
2035
2036async fn initialize_connectors(
2037    triggers: &[CollectedManifestTrigger],
2038    event_log: Arc<harn_vm::event_log::AnyEventLog>,
2039    secrets: Arc<dyn harn_vm::secrets::SecretProvider>,
2040    metrics: Arc<harn_vm::MetricsRegistry>,
2041    provider_overrides: &[ResolvedProviderConnectorConfig],
2042) -> Result<ConnectorRuntime, OrchestratorError> {
2043    let mut registry = harn_vm::ConnectorRegistry::default();
2044    let mut trigger_registry = harn_vm::TriggerRegistry::default();
2045    let mut grouped_kinds: BTreeMap<harn_vm::ProviderId, BTreeSet<String>> = BTreeMap::new();
2046
2047    for trigger in triggers {
2048        let binding = trigger_binding_for(&trigger.config)?;
2049        grouped_kinds
2050            .entry(binding.provider.clone())
2051            .or_default()
2052            .insert(binding.kind.as_str().to_string());
2053        trigger_registry.register(binding);
2054    }
2055
2056    let ctx = harn_vm::ConnectorCtx {
2057        inbox: Arc::new(
2058            harn_vm::InboxIndex::new(event_log.clone(), metrics.clone())
2059                .await
2060                .map_err(|error| error.to_string())?,
2061        ),
2062        event_log,
2063        secrets,
2064        metrics,
2065        rate_limiter: Arc::new(harn_vm::RateLimiterFactory::default()),
2066    };
2067
2068    let mut providers = Vec::new();
2069    let mut handles = Vec::new();
2070    for (provider, kinds) in grouped_kinds {
2071        let provider_name = provider.as_str().to_string();
2072        if let Some(connector) = connector_override_for(&provider, provider_overrides).await? {
2073            registry.remove(&provider);
2074            registry
2075                .register(connector)
2076                .map_err(|error| error.to_string())?;
2077        }
2078        if registry.get(&provider).is_none() {
2079            if provider_requires_harn_connector(provider.as_str()) {
2080                return Err(format!(
2081                    "provider '{}' is package-backed; add [[providers]] id = \"{}\" with \
2082                     connector = {{ harn = \"...\" }} to the manifest",
2083                    provider.as_str(),
2084                    provider.as_str()
2085                )
2086                .into());
2087            }
2088            let connector = connector_for(&provider, kinds);
2089            registry
2090                .register(connector)
2091                .map_err(|error| error.to_string())?;
2092        }
2093        let handle = registry
2094            .get(&provider)
2095            .ok_or_else(|| format!("connector registry lost provider '{}'", provider.as_str()))?;
2096        handle
2097            .lock()
2098            .await
2099            .init(ctx.clone())
2100            .await
2101            .map_err(|error| error.to_string())?;
2102        handles.push(handle.clone());
2103        providers.push(provider_name);
2104    }
2105
2106    Ok(ConnectorRuntime {
2107        registry,
2108        trigger_registry,
2109        handles,
2110        providers,
2111        activations: Vec::new(),
2112        provider_overrides: provider_overrides.to_vec(),
2113    })
2114}
2115
2116fn trigger_binding_for(
2117    config: &ResolvedTriggerConfig,
2118) -> Result<harn_vm::TriggerBinding, OrchestratorError> {
2119    Ok(harn_vm::TriggerBinding {
2120        provider: config.provider.clone(),
2121        kind: harn_vm::TriggerKind::from(trigger_kind_name(config.kind)),
2122        binding_id: config.id.clone(),
2123        dedupe_key: config.dedupe_key.clone(),
2124        dedupe_retention_days: config.retry.retention_days,
2125        config: connector_binding_config(config)?,
2126    })
2127}
2128
2129fn connector_binding_config(
2130    config: &ResolvedTriggerConfig,
2131) -> Result<JsonValue, OrchestratorError> {
2132    match config.kind {
2133        crate::package::TriggerKind::Cron => {
2134            serde_json::to_value(&config.kind_specific).map_err(|error| {
2135                OrchestratorError::Serve({
2136                    format!(
2137                        "failed to encode cron trigger config '{}': {error}",
2138                        config.id
2139                    )
2140                })
2141            })
2142        }
2143        crate::package::TriggerKind::Webhook => Ok(serde_json::json!({
2144            "match": config.match_,
2145            "secrets": config.secrets,
2146            "webhook": config.kind_specific,
2147        })),
2148        crate::package::TriggerKind::Poll => Ok(serde_json::json!({
2149            "match": config.match_,
2150            "secrets": config.secrets,
2151            "poll": config.kind_specific,
2152        })),
2153        crate::package::TriggerKind::Stream => Ok(serde_json::json!({
2154            "match": config.match_,
2155            "secrets": config.secrets,
2156            "stream": config.kind_specific,
2157            "window": config.window,
2158        })),
2159        crate::package::TriggerKind::A2aPush => Ok(serde_json::json!({
2160            "match": config.match_,
2161            "secrets": config.secrets,
2162            "a2a_push": a2a_push_connector_config(&config.kind_specific)?,
2163        })),
2164        _ => Ok(JsonValue::Null),
2165    }
2166}
2167
2168fn a2a_push_connector_config(
2169    kind_specific: &BTreeMap<String, toml::Value>,
2170) -> Result<JsonValue, OrchestratorError> {
2171    if let Some(nested) = kind_specific.get("a2a_push") {
2172        return serde_json::to_value(nested).map_err(|error| {
2173            OrchestratorError::Serve(format!("failed to encode a2a_push trigger config: {error}"))
2174        });
2175    }
2176    let filtered = kind_specific
2177        .iter()
2178        .filter(|(key, _)| key.as_str() != "path")
2179        .map(|(key, value)| (key.clone(), value.clone()))
2180        .collect::<BTreeMap<_, _>>();
2181    serde_json::to_value(filtered).map_err(|error| {
2182        OrchestratorError::Serve(format!("failed to encode a2a_push trigger config: {error}"))
2183    })
2184}
2185
2186fn connector_for(
2187    provider: &harn_vm::ProviderId,
2188    kinds: BTreeSet<String>,
2189) -> Box<dyn harn_vm::Connector> {
2190    match provider.as_str() {
2191        "cron" => Box::new(harn_vm::CronConnector::new()),
2192        _ => Box::new(PlaceholderConnector::new(provider.clone(), kinds)),
2193    }
2194}
2195
2196async fn connector_override_for(
2197    provider: &harn_vm::ProviderId,
2198    provider_overrides: &[ResolvedProviderConnectorConfig],
2199) -> Result<Option<Box<dyn harn_vm::Connector>>, OrchestratorError> {
2200    let Some(override_config) = provider_overrides
2201        .iter()
2202        .find(|entry| entry.id == *provider)
2203    else {
2204        return Ok(None);
2205    };
2206    match &override_config.connector {
2207        ResolvedProviderConnectorKind::RustBuiltin => Ok(None),
2208        ResolvedProviderConnectorKind::Invalid(message) => {
2209            Err(OrchestratorError::Serve(message.clone()))
2210        }
2211        ResolvedProviderConnectorKind::Harn { module } => {
2212            let module_path =
2213                harn_vm::resolve_module_import_path(&override_config.manifest_dir, module);
2214            let connector = harn_vm::HarnConnector::load(&module_path)
2215                .await
2216                .map_err(|error| {
2217                    format!(
2218                        "failed to load Harn connector '{}' for provider '{}': {error}",
2219                        module_path.display(),
2220                        provider.as_str()
2221                    )
2222                })?;
2223            Ok(Some(Box::new(connector)))
2224        }
2225    }
2226}
2227
2228fn build_route_configs(
2229    triggers: &[CollectedManifestTrigger],
2230    binding_versions: &BTreeMap<String, u32>,
2231) -> Result<Vec<RouteConfig>, OrchestratorError> {
2232    let mut seen_paths = BTreeSet::new();
2233    let mut routes = Vec::new();
2234    for trigger in triggers {
2235        let Some(binding_version) = binding_versions.get(&trigger.config.id).copied() else {
2236            return Err(format!(
2237                "trigger registry is missing active manifest binding '{}'",
2238                trigger.config.id
2239            )
2240            .into());
2241        };
2242        if let Some(route) = RouteConfig::from_trigger(trigger, binding_version)? {
2243            if !seen_paths.insert(route.path.clone()) {
2244                return Err(format!(
2245                    "trigger route '{}' is configured more than once",
2246                    route.path
2247                )
2248                .into());
2249            }
2250            routes.push(route);
2251        }
2252    }
2253    Ok(routes)
2254}
2255
2256fn attach_route_connectors(
2257    routes: Vec<RouteConfig>,
2258    registry: &harn_vm::ConnectorRegistry,
2259    provider_overrides: &[ResolvedProviderConnectorConfig],
2260) -> Result<Vec<RouteConfig>, OrchestratorError> {
2261    routes
2262        .into_iter()
2263        .map(|mut route| {
2264            if route.connector_ingress
2265                || connector_owns_ingress(route.provider.as_str(), provider_overrides)
2266            {
2267                route.connector = Some(registry.get(&route.provider).ok_or_else(|| {
2268                    format!(
2269                        "connector registry is missing provider '{}'",
2270                        route.provider.as_str()
2271                    )
2272                })?);
2273            }
2274            Ok(route)
2275        })
2276        .collect()
2277}
2278
2279fn connector_owns_ingress(
2280    provider: &str,
2281    provider_overrides: &[ResolvedProviderConnectorConfig],
2282) -> bool {
2283    provider_overrides.iter().any(|entry| {
2284        entry.id.as_str() == provider
2285            && matches!(entry.connector, ResolvedProviderConnectorKind::Harn { .. })
2286    })
2287}
2288
2289fn provider_requires_harn_connector(provider: &str) -> bool {
2290    harn_vm::provider_metadata(provider).is_some_and(|metadata| {
2291        matches!(
2292            metadata.runtime,
2293            harn_vm::ProviderRuntimeMetadata::Placeholder
2294        )
2295    })
2296}
2297
2298fn live_manifest_binding_versions() -> BTreeMap<String, u32> {
2299    let mut versions = BTreeMap::new();
2300    for binding in harn_vm::snapshot_trigger_bindings() {
2301        if binding.source != harn_vm::TriggerBindingSource::Manifest {
2302            continue;
2303        }
2304        if binding.state == harn_vm::TriggerState::Terminated {
2305            continue;
2306        }
2307        versions
2308            .entry(binding.id)
2309            .and_modify(|current: &mut u32| *current = (*current).max(binding.version))
2310            .or_insert(binding.version);
2311    }
2312    versions
2313}
2314
2315fn trigger_state_snapshots(
2316    triggers: &[CollectedManifestTrigger],
2317    listener_metrics: &BTreeMap<String, TriggerMetricSnapshot>,
2318) -> Vec<TriggerStateSnapshot> {
2319    let bindings_by_id = harn_vm::snapshot_trigger_bindings()
2320        .into_iter()
2321        .filter(|binding| binding.source == harn_vm::TriggerBindingSource::Manifest)
2322        .fold(
2323            BTreeMap::<String, harn_vm::TriggerBindingSnapshot>::new(),
2324            |mut acc, binding| {
2325                match acc.get(&binding.id) {
2326                    Some(current) if current.version >= binding.version => {}
2327                    _ => {
2328                        acc.insert(binding.id.clone(), binding);
2329                    }
2330                }
2331                acc
2332            },
2333        );
2334
2335    triggers
2336        .iter()
2337        .map(|trigger| {
2338            let runtime = bindings_by_id.get(&trigger.config.id);
2339            let metrics = listener_metrics.get(&trigger.config.id);
2340            TriggerStateSnapshot {
2341                id: trigger.config.id.clone(),
2342                provider: trigger.config.provider.as_str().to_string(),
2343                kind: trigger_kind_name(trigger.config.kind).to_string(),
2344                handler: handler_kind(&trigger.handler).to_string(),
2345                version: runtime.map(|binding| binding.version),
2346                state: runtime.map(|binding| binding.state.as_str().to_string()),
2347                received: metrics.map(|value| value.received).unwrap_or(0),
2348                dispatched: metrics.map(|value| value.dispatched).unwrap_or(0),
2349                failed: metrics.map(|value| value.failed).unwrap_or(0),
2350                in_flight: metrics.map(|value| value.in_flight).unwrap_or(0),
2351            }
2352        })
2353        .collect()
2354}
2355
2356// ── Misc helpers ──────────────────────────────────────────────────────────────
2357
2358fn format_trigger_summary(triggers: &[CollectedManifestTrigger]) -> String {
2359    if triggers.is_empty() {
2360        return "none".to_string();
2361    }
2362    triggers
2363        .iter()
2364        .map(|trigger| {
2365            format!(
2366                "{} [{}:{} -> {}]",
2367                trigger.config.id,
2368                trigger.config.provider.as_str(),
2369                trigger_kind_name(trigger.config.kind),
2370                handler_kind(&trigger.handler)
2371            )
2372        })
2373        .collect::<Vec<_>>()
2374        .join(", ")
2375}
2376
2377fn format_activation_summary(activations: &[harn_vm::ActivationHandle]) -> String {
2378    if activations.is_empty() {
2379        return "none".to_string();
2380    }
2381    activations
2382        .iter()
2383        .map(|activation| {
2384            format!(
2385                "{}({})",
2386                activation.provider.as_str(),
2387                activation.binding_count
2388            )
2389        })
2390        .collect::<Vec<_>>()
2391        .join(", ")
2392}
2393
2394fn handler_kind(handler: &CollectedTriggerHandler) -> &'static str {
2395    match handler {
2396        CollectedTriggerHandler::Local { .. } => "local",
2397        CollectedTriggerHandler::A2a { .. } => "a2a",
2398        CollectedTriggerHandler::Worker { .. } => "worker",
2399        CollectedTriggerHandler::Persona { .. } => "persona",
2400    }
2401}
2402
2403fn trigger_kind_name(kind: crate::package::TriggerKind) -> &'static str {
2404    match kind {
2405        crate::package::TriggerKind::Webhook => "webhook",
2406        crate::package::TriggerKind::Cron => "cron",
2407        crate::package::TriggerKind::Poll => "poll",
2408        crate::package::TriggerKind::Stream => "stream",
2409        crate::package::TriggerKind::Predicate => "predicate",
2410        crate::package::TriggerKind::A2aPush => "a2a-push",
2411    }
2412}
2413
2414fn has_orchestrator_api_keys_configured() -> bool {
2415    std::env::var("HARN_ORCHESTRATOR_API_KEYS")
2416        .ok()
2417        .is_some_and(|value| value.split(',').any(|segment| !segment.trim().is_empty()))
2418}
2419
2420fn has_mcp_oauth_configured() -> bool {
2421    std::env::var("HARN_MCP_OAUTH_AUTHORIZATION_SERVERS")
2422        .ok()
2423        .is_some_and(|value| value.split(',').any(|segment| !segment.trim().is_empty()))
2424}
2425
2426fn validate_mcp_paths(
2427    path: &str,
2428    sse_path: &str,
2429    messages_path: &str,
2430) -> Result<(), OrchestratorError> {
2431    let reserved = [
2432        "/health",
2433        "/healthz",
2434        "/readyz",
2435        "/metrics",
2436        "/admin/reload",
2437        "/acp",
2438    ];
2439    let mut seen = BTreeSet::new();
2440    for (label, value) in [
2441        ("--mcp-path", path),
2442        ("--mcp-sse-path", sse_path),
2443        ("--mcp-messages-path", messages_path),
2444    ] {
2445        if !value.starts_with('/') {
2446            return Err(format!("{label} must start with '/'").into());
2447        }
2448        if value == "/" {
2449            return Err(format!("{label} cannot be '/'").into());
2450        }
2451        if reserved.contains(&value) {
2452            return Err(format!("{label} cannot use reserved listener path '{value}'").into());
2453        }
2454        if !seen.insert(value) {
2455            return Err(format!("embedded MCP paths must be unique; duplicate '{value}'").into());
2456        }
2457    }
2458    Ok(())
2459}
2460
2461async fn append_lifecycle_event(
2462    log: &Arc<AnyEventLog>,
2463    kind: &str,
2464    payload: JsonValue,
2465) -> Result<(), OrchestratorError> {
2466    let topic =
2467        harn_vm::event_log::Topic::new(LIFECYCLE_TOPIC).map_err(|error| error.to_string())?;
2468    log.append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
2469        .await
2470        .map(|_| ())
2471        .map_err(|error| {
2472            OrchestratorError::Serve(format!(
2473                "failed to append orchestrator lifecycle event: {error}"
2474            ))
2475        })
2476}
2477
2478async fn append_pump_lifecycle_event(
2479    log: &Arc<AnyEventLog>,
2480    kind: &str,
2481    payload: JsonValue,
2482) -> Result<(), OrchestratorError> {
2483    append_lifecycle_event(log, kind, payload).await
2484}
2485
2486async fn wait_for_pump_drain_release(
2487    log: &Arc<AnyEventLog>,
2488    topic: &harn_vm::event_log::Topic,
2489    event_id: u64,
2490    gate_rx: &mut watch::Receiver<bool>,
2491) -> Result<(), OrchestratorError> {
2492    if !*gate_rx.borrow() {
2493        return Ok(());
2494    }
2495    append_pump_lifecycle_event(
2496        log,
2497        "pump_drain_waiting",
2498        json!({
2499            "topic": topic.as_str(),
2500            "event_log_id": event_id,
2501        }),
2502    )
2503    .await?;
2504    while *gate_rx.borrow() {
2505        if gate_rx.changed().await.is_err() {
2506            break;
2507        }
2508    }
2509    Ok(())
2510}
2511
2512#[cfg_attr(not(unix), allow(dead_code))]
2513async fn append_manifest_event(
2514    log: &Arc<AnyEventLog>,
2515    kind: &str,
2516    payload: JsonValue,
2517) -> Result<(), OrchestratorError> {
2518    let topic =
2519        harn_vm::event_log::Topic::new(MANIFEST_TOPIC).map_err(|error| error.to_string())?;
2520    log.append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
2521        .await
2522        .map(|_| ())
2523        .map_err(|error| {
2524            OrchestratorError::Serve(format!(
2525                "failed to append orchestrator manifest event: {error}"
2526            ))
2527        })
2528}
2529
2530async fn record_pump_metrics(
2531    metrics: &harn_vm::MetricsRegistry,
2532    log: &Arc<AnyEventLog>,
2533    topic: &harn_vm::event_log::Topic,
2534    last_seen: u64,
2535    outstanding: usize,
2536) -> Result<(), OrchestratorError> {
2537    let latest = log.latest(topic).await.ok().flatten().unwrap_or(last_seen);
2538    let backlog = latest.saturating_sub(last_seen);
2539    metrics.set_orchestrator_pump_outstanding(topic.as_str(), outstanding);
2540    metrics.set_orchestrator_pump_backlog(topic.as_str(), backlog);
2541    append_pump_lifecycle_event(
2542        log,
2543        "pump_metrics_recorded",
2544        json!({
2545            "topic": topic.as_str(),
2546            "latest": latest,
2547            "last_seen": last_seen,
2548            "backlog": backlog,
2549            "outstanding": outstanding,
2550        }),
2551    )
2552    .await
2553}
2554
2555fn admission_delay(occurred_at_ms: i64) -> Duration {
2556    let now = std::time::SystemTime::now()
2557        .duration_since(std::time::UNIX_EPOCH)
2558        .unwrap_or_default()
2559        .as_millis() as i64;
2560    Duration::from_millis(now.saturating_sub(occurred_at_ms).max(0) as u64)
2561}
2562
2563async fn emit_drain_truncated(
2564    log: &Arc<AnyEventLog>,
2565    topic_name: &str,
2566    report: PumpDrainReport,
2567    config: DrainConfig,
2568) -> Result<(), OrchestratorError> {
2569    if !report.truncated() {
2570        return Ok(());
2571    }
2572    eprintln!(
2573        "[harn] warning: pump drain truncated for {topic_name}: remaining_queued={} drain_items={} reason={}",
2574        report.remaining_queued,
2575        report.drain_items,
2576        report.stop_reason.as_str()
2577    );
2578    append_lifecycle_event(
2579        log,
2580        "drain_truncated",
2581        json!({
2582            "topic": topic_name,
2583            "remaining_queued": report.remaining_queued,
2584            "drain_items": report.drain_items,
2585            "outstanding_tasks": report.outstanding_tasks,
2586            "max_items": config.max_items,
2587            "deadline_secs": config.deadline.as_secs(),
2588            "reason": report.stop_reason.as_str(),
2589        }),
2590    )
2591    .await
2592}
2593
2594async fn topic_latest_id(
2595    log: &Arc<AnyEventLog>,
2596    topic_name: &str,
2597) -> Result<u64, OrchestratorError> {
2598    let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
2599    log.latest(&topic)
2600        .await
2601        .map(|value| value.unwrap_or(0))
2602        .map_err(|error| {
2603            OrchestratorError::Serve(format!(
2604                "failed to read topic head for {topic_name}: {error}"
2605            ))
2606        })
2607}
2608
2609async fn drain_pump_best_effort(
2610    log: &Arc<AnyEventLog>,
2611    topic_name: &str,
2612    pump: PumpHandle,
2613    config: DrainConfig,
2614    overall_deadline: tokio::time::Instant,
2615) -> Result<PumpDrainReport, OrchestratorError> {
2616    let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
2617    let consumer = pump_consumer_id(&topic)?;
2618    let start_seen = log
2619        .consumer_cursor(&topic, &consumer)
2620        .await
2621        .map_err(|error| format!("failed to read consumer cursor for {topic_name}: {error}"))?
2622        .unwrap_or(0);
2623    let up_to = log
2624        .latest(&topic)
2625        .await
2626        .map_err(|error| format!("failed to read topic head for {topic_name}: {error}"))?
2627        .unwrap_or(0);
2628    let budget = remaining_budget(overall_deadline);
2629
2630    match tokio::time::timeout(
2631        budget,
2632        pump.drain(log, topic_name, up_to, config, overall_deadline),
2633    )
2634    .await
2635    {
2636        Ok(Ok(report)) => Ok(report),
2637        Ok(Err(error)) => {
2638            eprintln!("[harn] warning: pump drain error for {topic_name}: {error}");
2639            best_effort_pump_report(
2640                log,
2641                &topic,
2642                &consumer,
2643                start_seen,
2644                up_to,
2645                PumpDrainStopReason::Error,
2646            )
2647            .await
2648        }
2649        Err(_) => {
2650            eprintln!(
2651                "[harn] warning: pump drain timed out for {topic_name} after {:?}",
2652                budget
2653            );
2654            best_effort_pump_report(
2655                log,
2656                &topic,
2657                &consumer,
2658                start_seen,
2659                up_to,
2660                PumpDrainStopReason::Deadline,
2661            )
2662            .await
2663        }
2664    }
2665}
2666
2667async fn best_effort_pump_report(
2668    log: &Arc<AnyEventLog>,
2669    topic: &harn_vm::event_log::Topic,
2670    consumer: &ConsumerId,
2671    start_seen: u64,
2672    up_to: u64,
2673    stop_reason: PumpDrainStopReason,
2674) -> Result<PumpDrainReport, OrchestratorError> {
2675    let last_seen = log
2676        .consumer_cursor(topic, consumer)
2677        .await
2678        .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
2679        .unwrap_or(start_seen);
2680    let stats = PumpStats {
2681        last_seen,
2682        processed: last_seen.saturating_sub(start_seen),
2683    };
2684    Ok(pump_drain_report(stats, start_seen, up_to, 0, stop_reason))
2685}
2686
2687fn remaining_budget(deadline: tokio::time::Instant) -> Duration {
2688    deadline.saturating_duration_since(tokio::time::Instant::now())
2689}
2690
2691fn maybe_finish_pump_drain(
2692    stats: PumpStats,
2693    progress: PumpDrainProgress,
2694    outstanding_tasks: usize,
2695) -> Option<PumpDrainReport> {
2696    if stats.last_seen >= progress.request.up_to && outstanding_tasks == 0 {
2697        return Some(pump_drain_report(
2698            stats,
2699            progress.start_seen,
2700            progress.request.up_to,
2701            outstanding_tasks,
2702            PumpDrainStopReason::Drained,
2703        ));
2704    }
2705    if outstanding_tasks > 0 {
2706        if tokio::time::Instant::now() >= progress.request.deadline {
2707            return Some(pump_drain_report(
2708                stats,
2709                progress.start_seen,
2710                progress.request.up_to,
2711                outstanding_tasks,
2712                PumpDrainStopReason::Deadline,
2713            ));
2714        }
2715        return None;
2716    }
2717    let drain_items = stats.last_seen.saturating_sub(progress.start_seen);
2718    if drain_items >= progress.request.config.max_items as u64 {
2719        return Some(pump_drain_report(
2720            stats,
2721            progress.start_seen,
2722            progress.request.up_to,
2723            outstanding_tasks,
2724            PumpDrainStopReason::MaxItems,
2725        ));
2726    }
2727    if tokio::time::Instant::now() >= progress.request.deadline {
2728        return Some(pump_drain_report(
2729            stats,
2730            progress.start_seen,
2731            progress.request.up_to,
2732            outstanding_tasks,
2733            PumpDrainStopReason::Deadline,
2734        ));
2735    }
2736    None
2737}
2738
2739fn pump_drain_report(
2740    stats: PumpStats,
2741    start_seen: u64,
2742    up_to: u64,
2743    outstanding_tasks: usize,
2744    stop_reason: PumpDrainStopReason,
2745) -> PumpDrainReport {
2746    PumpDrainReport {
2747        stats,
2748        drain_items: stats.last_seen.saturating_sub(start_seen),
2749        remaining_queued: up_to.saturating_sub(stats.last_seen),
2750        outstanding_tasks,
2751        stop_reason,
2752    }
2753}
2754
2755fn pump_consumer_id(topic: &harn_vm::event_log::Topic) -> Result<ConsumerId, OrchestratorError> {
2756    ConsumerId::new(format!("orchestrator-pump.{}", topic.as_str())).map_err(|error| {
2757        OrchestratorError::Serve(format!("failed to create consumer id for {topic}: {error}"))
2758    })
2759}
2760
2761fn inbox_task_test_release_file() -> Option<PathBuf> {
2762    test_file_from_env(TEST_INBOX_TASK_RELEASE_FILE_ENV)
2763}
2764
2765fn test_file_from_env(key: &str) -> Option<PathBuf> {
2766    std::env::var_os(key)
2767        .filter(|value| !value.is_empty())
2768        .map(PathBuf::from)
2769}
2770
2771async fn wait_for_test_release_file(path: &Path) {
2772    while tokio::fs::metadata(path).await.is_err() {
2773        tokio::time::sleep(Duration::from_millis(10)).await;
2774    }
2775}
2776
2777fn pending_pump_test_should_fail() -> bool {
2778    std::env::var(TEST_FAIL_PENDING_PUMP_ENV)
2779        .ok()
2780        .is_some_and(|value| value != "0")
2781}
2782
2783fn spawn_manifest_watcher(
2784    config_path: PathBuf,
2785    reload: AdminReloadHandle,
2786) -> Result<notify::RecommendedWatcher, OrchestratorError> {
2787    use notify::{Event, EventKind, RecursiveMode, Watcher};
2788
2789    let watch_dir = config_path.parent().ok_or_else(|| {
2790        format!(
2791            "manifest has no parent directory: {}",
2792            config_path.display()
2793        )
2794    })?;
2795    let target_name = config_path
2796        .file_name()
2797        .and_then(|name| name.to_str())
2798        .ok_or_else(|| {
2799            format!(
2800                "manifest path is not valid UTF-8: {}",
2801                config_path.display()
2802            )
2803        })?
2804        .to_string();
2805    let (tx, mut rx) = mpsc::unbounded_channel::<()>();
2806    tokio::task::spawn_local(async move {
2807        while rx.recv().await.is_some() {
2808            tokio::time::sleep(Duration::from_millis(200)).await;
2809            while rx.try_recv().is_ok() {}
2810            let _ = reload.trigger("file_watch");
2811        }
2812    });
2813    let mut watcher =
2814        notify::recommended_watcher(move |res: Result<Event, notify::Error>| match res {
2815            Ok(event)
2816                if matches!(
2817                    event.kind,
2818                    EventKind::Modify(_)
2819                        | EventKind::Create(_)
2820                        | EventKind::Remove(_)
2821                        | EventKind::Any
2822                ) && event.paths.iter().any(|path| {
2823                    path.file_name()
2824                        .and_then(|name| name.to_str())
2825                        .is_some_and(|name| name == target_name)
2826                }) =>
2827            {
2828                let _ = tx.send(());
2829            }
2830            _ => {}
2831        })
2832        .map_err(|error| format!("failed to create manifest watcher: {error}"))?;
2833    watcher
2834        .watch(watch_dir, RecursiveMode::NonRecursive)
2835        .map_err(|error| {
2836            format!(
2837                "failed to watch manifest directory {}: {error}",
2838                watch_dir.display()
2839            )
2840        })?;
2841    Ok(watcher)
2842}
2843
2844pub(crate) fn load_manifest(config_path: &Path) -> Result<(Manifest, PathBuf), OrchestratorError> {
2845    if !config_path.is_file() {
2846        return Err(format!("manifest not found: {}", config_path.display()).into());
2847    }
2848    let content = std::fs::read_to_string(config_path)
2849        .map_err(|error| format!("failed to read {}: {error}", config_path.display()))?;
2850    let manifest = toml::from_str::<Manifest>(&content)
2851        .map_err(|error| format!("failed to parse {}: {error}", config_path.display()))?;
2852    let manifest_dir = config_path.parent().map(Path::to_path_buf).ok_or_else(|| {
2853        format!(
2854            "manifest has no parent directory: {}",
2855            config_path.display()
2856        )
2857    })?;
2858    Ok((manifest, manifest_dir))
2859}
2860
2861pub(crate) fn absolutize_from_cwd(path: &Path) -> Result<PathBuf, OrchestratorError> {
2862    let candidate = if path.is_absolute() {
2863        path.to_path_buf()
2864    } else {
2865        std::env::current_dir()
2866            .map_err(|error| format!("failed to read current directory: {error}"))?
2867            .join(path)
2868    };
2869    Ok(candidate)
2870}
2871
2872fn configured_secret_chain_display() -> String {
2873    std::env::var(harn_vm::secrets::SECRET_PROVIDER_CHAIN_ENV)
2874        .unwrap_or_else(|_| harn_vm::secrets::DEFAULT_SECRET_PROVIDER_CHAIN.to_string())
2875        .split(',')
2876        .map(str::trim)
2877        .filter(|segment| !segment.is_empty())
2878        .collect::<Vec<_>>()
2879        .join(" -> ")
2880}
2881
2882fn secret_namespace_for(manifest_dir: &Path) -> String {
2883    match std::env::var("HARN_SECRET_NAMESPACE") {
2884        Ok(namespace) if !namespace.trim().is_empty() => namespace,
2885        _ => {
2886            let leaf = manifest_dir
2887                .file_name()
2888                .and_then(|name| name.to_str())
2889                .filter(|name| !name.is_empty())
2890                .unwrap_or("workspace");
2891            format!("harn/{leaf}")
2892        }
2893    }
2894}
2895
2896fn now_rfc3339() -> Result<String, OrchestratorError> {
2897    OffsetDateTime::now_utc()
2898        .format(&Rfc3339)
2899        .map_err(|error| OrchestratorError::Serve(format!("failed to format timestamp: {error}")))
2900}
2901
2902fn write_state_snapshot(
2903    path: &Path,
2904    snapshot: &ServeStateSnapshot,
2905) -> Result<(), OrchestratorError> {
2906    let encoded = serde_json::to_vec_pretty(snapshot)
2907        .map_err(|error| format!("failed to encode orchestrator state snapshot: {error}"))?;
2908    if let Some(parent) = path.parent() {
2909        std::fs::create_dir_all(parent)
2910            .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
2911    }
2912    std::fs::write(path, encoded).map_err(|error| {
2913        OrchestratorError::Serve(format!("failed to write {}: {error}", path.display()))
2914    })
2915}
2916
2917// ── Snapshot types ────────────────────────────────────────────────────────────
2918
2919#[derive(Debug, Serialize)]
2920struct ServeStateSnapshot {
2921    status: String,
2922    role: String,
2923    bind: String,
2924    listener_url: String,
2925    manifest_path: String,
2926    state_dir: String,
2927    started_at: String,
2928    stopped_at: Option<String>,
2929    secret_provider_chain: String,
2930    event_log_backend: String,
2931    event_log_location: Option<String>,
2932    triggers: Vec<TriggerStateSnapshot>,
2933    connectors: Vec<String>,
2934    activations: Vec<ConnectorActivationSnapshot>,
2935}
2936
2937#[derive(Debug, Serialize)]
2938struct TriggerStateSnapshot {
2939    id: String,
2940    provider: String,
2941    kind: String,
2942    handler: String,
2943    version: Option<u32>,
2944    state: Option<String>,
2945    received: u64,
2946    dispatched: u64,
2947    failed: u64,
2948    in_flight: u64,
2949}
2950
2951#[derive(Debug, Serialize)]
2952struct ConnectorActivationSnapshot {
2953    provider: String,
2954    binding_count: usize,
2955}
2956
2957// ── Placeholder connector ─────────────────────────────────────────────────────
2958
2959struct PlaceholderConnector {
2960    provider_id: harn_vm::ProviderId,
2961    kinds: Vec<harn_vm::TriggerKind>,
2962    _ctx: Option<harn_vm::ConnectorCtx>,
2963}
2964
2965impl PlaceholderConnector {
2966    fn new(provider_id: harn_vm::ProviderId, kinds: BTreeSet<String>) -> Self {
2967        Self {
2968            provider_id,
2969            kinds: kinds.into_iter().map(harn_vm::TriggerKind::from).collect(),
2970            _ctx: None,
2971        }
2972    }
2973}
2974
2975struct PlaceholderClient;
2976
2977#[async_trait]
2978impl harn_vm::ConnectorClient for PlaceholderClient {
2979    async fn call(
2980        &self,
2981        method: &str,
2982        _args: JsonValue,
2983    ) -> Result<JsonValue, harn_vm::ClientError> {
2984        Err(harn_vm::ClientError::Other(format!(
2985            "connector client method '{method}' is not implemented in the orchestrator scaffold"
2986        )))
2987    }
2988}
2989
2990#[async_trait]
2991impl harn_vm::Connector for PlaceholderConnector {
2992    fn provider_id(&self) -> &harn_vm::ProviderId {
2993        &self.provider_id
2994    }
2995
2996    fn kinds(&self) -> &[harn_vm::TriggerKind] {
2997        &self.kinds
2998    }
2999
3000    async fn init(&mut self, ctx: harn_vm::ConnectorCtx) -> Result<(), harn_vm::ConnectorError> {
3001        self._ctx = Some(ctx);
3002        Ok(())
3003    }
3004
3005    async fn activate(
3006        &self,
3007        bindings: &[harn_vm::TriggerBinding],
3008    ) -> Result<harn_vm::ActivationHandle, harn_vm::ConnectorError> {
3009        Ok(harn_vm::ActivationHandle::new(
3010            self.provider_id.clone(),
3011            bindings.len(),
3012        ))
3013    }
3014
3015    async fn normalize_inbound(
3016        &self,
3017        _raw: harn_vm::RawInbound,
3018    ) -> Result<harn_vm::TriggerEvent, harn_vm::ConnectorError> {
3019        Err(harn_vm::ConnectorError::Unsupported(format!(
3020            "connector '{}' inbound normalization is not implemented yet",
3021            self.provider_id.as_str()
3022        )))
3023    }
3024
3025    fn payload_schema(&self) -> harn_vm::ProviderPayloadSchema {
3026        harn_vm::ProviderPayloadSchema::named("TriggerEvent")
3027    }
3028
3029    fn client(&self) -> Arc<dyn harn_vm::ConnectorClient> {
3030        Arc::new(PlaceholderClient)
3031    }
3032}
3033
3034#[cfg(test)]
3035mod tests {
3036    use super::*;
3037    use futures::StreamExt;
3038    use harn_vm::event_log::{EventLog, Topic};
3039
3040    fn write_test_file(dir: &Path, relative: &str, contents: &str) {
3041        let path = dir.join(relative);
3042        if let Some(parent) = path.parent() {
3043            std::fs::create_dir_all(parent).unwrap();
3044        }
3045        std::fs::write(path, contents).unwrap();
3046    }
3047
3048    fn stream_manifest_fixture() -> &'static str {
3049        r#"
3050[package]
3051name = "fixture"
3052
3053[exports]
3054handlers = "lib.harn"
3055
3056[[triggers]]
3057id = "ws-stream"
3058kind = "stream"
3059provider = "websocket"
3060path = "/streams/ws"
3061match = { events = ["quote.tick"] }
3062handler = "handlers::on_stream"
3063"#
3064    }
3065
3066    fn stream_handler_fixture(marker_path: &Path) -> String {
3067        format!(
3068            r#"
3069import "std/triggers"
3070
3071pub fn on_stream(event: TriggerEvent) {{
3072  write_file({marker:?}, json_stringify({{
3073    provider: event.provider,
3074    kind: event.kind,
3075    key: event.provider_payload.key,
3076    stream: event.provider_payload.stream,
3077    amount: event.provider_payload.raw.value.amount,
3078  }}))
3079}}
3080"#,
3081            marker = marker_path.display().to_string()
3082        )
3083    }
3084
3085    /// Proof test: `stream_trigger_route_uses_generic_stream_connector`
3086    /// migrated from subprocess + SQLite polling to in-process harness +
3087    /// `EventLog::subscribe()`.
3088    #[tokio::test(flavor = "multi_thread")]
3089    async fn stream_trigger_route_uses_generic_stream_connector_in_process() {
3090        // Env vars are process-global; hold the lock for the entire test so
3091        // concurrent unit tests that also set env vars don't race.
3092        let _env_lock = crate::tests::common::env_lock::lock_env().lock().await;
3093        let _secret_providers = crate::env_guard::ScopedEnvVar::set("HARN_SECRET_PROVIDERS", "env");
3094
3095        let temp = tempfile::TempDir::new().unwrap();
3096        let marker_path = temp.path().join("stream-handler.json");
3097        write_test_file(temp.path(), "harn.toml", stream_manifest_fixture());
3098        write_test_file(
3099            temp.path(),
3100            "lib.harn",
3101            &stream_handler_fixture(&marker_path),
3102        );
3103
3104        let config =
3105            OrchestratorConfig::for_test(temp.path().join("harn.toml"), temp.path().join("state"));
3106        let harness = OrchestratorHarness::start(config)
3107            .await
3108            .expect("harness start");
3109        let base_url = harness.listener_url().to_string();
3110        let event_log = harness.event_log();
3111
3112        let response = reqwest::Client::new()
3113            .post(format!("{base_url}/streams/ws"))
3114            .header("content-type", "application/json")
3115            .json(&serde_json::json!({
3116                "key": "acct-1",
3117                "stream": "quotes",
3118                "value": {"amount": 10}
3119            }))
3120            .send()
3121            .await
3122            .unwrap();
3123        assert_eq!(response.status(), 200);
3124
3125        // Event-driven wait: subscribe to orchestrator.lifecycle and block
3126        // until pump_dispatch_completed arrives, replacing SQLite polling.
3127        let topic = Topic::new("orchestrator.lifecycle").unwrap();
3128        let mut stream = event_log.clone().subscribe(&topic, None).await.unwrap();
3129        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30);
3130        loop {
3131            let remaining = deadline
3132                .checked_duration_since(tokio::time::Instant::now())
3133                .expect("timed out waiting for pump_dispatch_completed");
3134            let (_, event) = tokio::time::timeout(remaining, stream.next())
3135                .await
3136                .expect("timed out waiting for pump_dispatch_completed event")
3137                .expect("event stream ended unexpectedly")
3138                .expect("event stream error");
3139            if event.kind == "pump_dispatch_completed"
3140                && event.payload["status"] == serde_json::json!("completed")
3141            {
3142                break;
3143            }
3144        }
3145        drop(stream);
3146
3147        let marker: serde_json::Value =
3148            serde_json::from_str(&std::fs::read_to_string(&marker_path).unwrap()).unwrap();
3149        assert_eq!(
3150            marker.get("provider").and_then(|v| v.as_str()),
3151            Some("websocket")
3152        );
3153        assert_eq!(
3154            marker.get("kind").and_then(|v| v.as_str()),
3155            Some("quote.tick")
3156        );
3157        assert_eq!(marker.get("key").and_then(|v| v.as_str()), Some("acct-1"));
3158        assert_eq!(
3159            marker.get("stream").and_then(|v| v.as_str()),
3160            Some("quotes")
3161        );
3162        assert_eq!(marker.get("amount").and_then(|v| v.as_i64()), Some(10));
3163
3164        harness
3165            .shutdown(std::time::Duration::from_secs(5))
3166            .await
3167            .expect("harness shutdown");
3168    }
3169}