1use std::collections::{BTreeMap, BTreeSet};
2use std::net::SocketAddr;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use harn_vm::clock::{Clock, RealClock};
10use serde::{Deserialize, Serialize};
11use serde_json::{json, Value as JsonValue};
12use time::format_description::well_known::Rfc3339;
13use time::OffsetDateTime;
14use tokio::sync::{mpsc, oneshot, watch};
15use tokio::task::JoinSet;
16
17use harn_vm::event_log::{AnyEventLog, ConsumerId, EventLog};
18
19use super::common::stranded_envelopes;
20use super::errors::OrchestratorError;
21use super::listener::{
22 AdminReloadHandle, AdminReloadRequest, ListenerConfig, ListenerRuntime, RouteConfig,
23 TriggerMetricSnapshot,
24};
25use super::origin_guard::OriginAllowList;
26use super::role::OrchestratorRole;
27use super::supervisor_state::apply_supervisor_state;
28use super::tls::TlsFiles;
29use crate::package::{
30 self, CollectedManifestTrigger, CollectedTriggerHandler, Manifest,
31 ResolvedProviderConnectorConfig, ResolvedProviderConnectorKind, ResolvedTriggerConfig,
32};
33
34const LIFECYCLE_TOPIC: &str = "orchestrator.lifecycle";
35#[cfg_attr(not(unix), allow(dead_code))]
36const MANIFEST_TOPIC: &str = "orchestrator.manifest";
37const STATE_SNAPSHOT_FILE: &str = "orchestrator-state.json";
38const PENDING_TOPIC: &str = "orchestrator.triggers.pending";
39const CRON_TICK_TOPIC: &str = "connectors.cron.tick";
40const TEST_INBOX_TASK_RELEASE_FILE_ENV: &str = "HARN_TEST_ORCHESTRATOR_INBOX_TASK_RELEASE_FILE";
41const TEST_FAIL_PENDING_PUMP_ENV: &str = "HARN_TEST_ORCHESTRATOR_FAIL_PENDING_PUMP";
42const WAITPOINT_SERVICE_INTERVAL: Duration = Duration::from_millis(250);
43
44#[derive(Clone, Copy, Debug, PartialEq, Eq)]
48pub struct DrainConfig {
49 pub max_items: usize,
50 pub deadline: Duration,
51}
52
53impl Default for DrainConfig {
54 fn default() -> Self {
55 Self {
56 max_items: crate::package::default_orchestrator_drain_max_items(),
57 deadline: Duration::from_secs(
58 crate::package::default_orchestrator_drain_deadline_seconds(),
59 ),
60 }
61 }
62}
63
64#[derive(Clone, Copy, Debug, PartialEq, Eq)]
66pub struct PumpConfig {
67 pub max_outstanding: usize,
68}
69
70impl Default for PumpConfig {
71 fn default() -> Self {
72 Self {
73 max_outstanding: crate::package::default_orchestrator_pump_max_outstanding(),
74 }
75 }
76}
77
78#[derive(Clone, Debug)]
80pub struct OrchestratorConfig {
81 pub manifest_path: PathBuf,
82 pub state_dir: PathBuf,
83 pub bind: SocketAddr,
84 pub role: OrchestratorRole,
85 pub watch_manifest: bool,
86 pub mcp: bool,
87 pub mcp_path: String,
88 pub mcp_sse_path: String,
89 pub mcp_messages_path: String,
90 pub tls: Option<TlsFiles>,
91 pub shutdown_timeout: Duration,
92 pub drain: DrainConfig,
93 pub pump: PumpConfig,
94 pub log_format: Option<harn_vm::observability::otel::LogFormat>,
97 pub clock: Arc<dyn Clock>,
100}
101
102impl OrchestratorConfig {
103 pub fn for_test(manifest_path: PathBuf, state_dir: PathBuf) -> Self {
105 Self {
106 manifest_path,
107 state_dir,
108 bind: "127.0.0.1:0".parse().unwrap(),
109 role: OrchestratorRole::SingleTenant,
110 watch_manifest: false,
111 mcp: false,
112 mcp_path: "/mcp".to_string(),
113 mcp_sse_path: "/sse".to_string(),
114 mcp_messages_path: "/messages".to_string(),
115 tls: None,
116 shutdown_timeout: Duration::from_secs(5),
117 drain: DrainConfig::default(),
118 pump: PumpConfig::default(),
119 log_format: None,
120 clock: RealClock::arc(),
121 }
122 }
123
124 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
129 self.clock = clock;
130 self
131 }
132}
133
134#[derive(Debug)]
138pub struct ShutdownReport {
139 #[allow(dead_code)]
140 pub timed_out: bool,
141}
142
143#[derive(Debug)]
145pub struct HarnessError(pub String);
146
147impl std::fmt::Display for HarnessError {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.write_str(&self.0)
150 }
151}
152
153impl From<OrchestratorError> for HarnessError {
154 fn from(error: OrchestratorError) -> Self {
155 HarnessError(error.to_string())
156 }
157}
158
159impl From<String> for HarnessError {
160 fn from(s: String) -> Self {
161 HarnessError(s)
162 }
163}
164
165#[allow(dead_code)]
172pub struct OrchestratorHarness {
173 event_log: Arc<AnyEventLog>,
174 listener_url: String,
175 local_addr: SocketAddr,
176 state_dir: PathBuf,
177 admin_reload: AdminReloadHandle,
178 shutdown_tx: Arc<watch::Sender<bool>>,
179 pump_drain_gate: PumpDrainGate,
180 join: Option<std::thread::JoinHandle<()>>,
181}
182
183struct ReadyState {
184 event_log: Arc<AnyEventLog>,
185 listener_url: String,
186 local_addr: SocketAddr,
187 state_dir: PathBuf,
188 admin_reload: AdminReloadHandle,
189}
190
191#[derive(Clone)]
192struct PumpDrainGate {
193 hold_tx: watch::Sender<bool>,
194}
195
196impl PumpDrainGate {
197 fn new() -> Self {
198 let (hold_tx, _) = watch::channel(false);
199 Self { hold_tx }
200 }
201
202 fn pause(&self) {
203 let _ = self.hold_tx.send(true);
204 }
205
206 fn release(&self) {
207 let _ = self.hold_tx.send(false);
208 }
209
210 fn subscribe(&self) -> watch::Receiver<bool> {
211 self.hold_tx.subscribe()
212 }
213}
214
215#[allow(dead_code)]
216impl OrchestratorHarness {
217 pub async fn start(config: OrchestratorConfig) -> Result<Self, HarnessError> {
220 let (ready_tx, ready_rx) = oneshot::channel::<Result<ReadyState, OrchestratorError>>();
221 let (shutdown_tx, shutdown_rx) = watch::channel(false);
222 let shutdown_tx = Arc::new(shutdown_tx);
223 let pump_drain_gate = PumpDrainGate::new();
224 let task_pump_drain_gate = pump_drain_gate.clone();
225
226 let join = std::thread::spawn(move || {
227 let rt = tokio::runtime::Builder::new_multi_thread()
234 .worker_threads(2)
235 .enable_all()
236 .build()
237 .expect("failed to build OrchestratorHarness tokio runtime");
238 let local = tokio::task::LocalSet::new();
239 rt.block_on(local.run_until(orchestrator_task(
240 config,
241 ready_tx,
242 shutdown_rx,
243 task_pump_drain_gate,
244 )));
245 });
246
247 match ready_rx.await {
248 Ok(Ok(ready)) => Ok(Self {
249 event_log: ready.event_log,
250 listener_url: ready.listener_url,
251 local_addr: ready.local_addr,
252 state_dir: ready.state_dir,
253 admin_reload: ready.admin_reload,
254 shutdown_tx,
255 pump_drain_gate,
256 join: Some(join),
257 }),
258 Ok(Err(error)) => {
259 let _ = join.join();
260 Err(HarnessError::from(error))
261 }
262 Err(_) => {
263 let _ = join.join();
264 Err(HarnessError(
265 "harness thread exited before signaling readiness".to_string(),
266 ))
267 }
268 }
269 }
270
271 pub fn listener_url(&self) -> &str {
272 &self.listener_url
273 }
274
275 pub fn local_addr(&self) -> SocketAddr {
276 self.local_addr
277 }
278
279 pub fn event_log(&self) -> Arc<AnyEventLog> {
280 self.event_log.clone()
281 }
282
283 pub fn state_dir(&self) -> &Path {
284 &self.state_dir
285 }
286
287 pub fn admin_reload(&self) -> AdminReloadHandle {
290 self.admin_reload.clone()
291 }
292
293 pub fn shutdown_trigger(&self) -> Arc<watch::Sender<bool>> {
296 self.shutdown_tx.clone()
297 }
298
299 pub fn pause_pump_drain(&self) {
302 self.pump_drain_gate.pause();
303 }
304
305 pub fn release_pump_drain(&self) {
307 self.pump_drain_gate.release();
308 }
309
310 pub async fn shutdown(mut self, _deadline: Duration) -> Result<ShutdownReport, HarnessError> {
312 let _ = self.shutdown_tx.send(true);
314 let join = self.join.take().expect("join handle");
316 tokio::task::spawn_blocking(move || join.join())
317 .await
318 .map_err(|_| HarnessError("spawn_blocking join failed".to_string()))?
319 .map_err(|_| HarnessError("harness background thread panicked".to_string()))?;
320 Ok(ShutdownReport { timed_out: false })
321 }
322}
323
324impl Drop for OrchestratorHarness {
325 fn drop(&mut self) {
326 let _ = self.shutdown_tx.send(true);
327 if let Some(join) = self.join.take() {
328 let _ = join.join();
329 }
330 }
331}
332
333async fn orchestrator_task(
336 config: OrchestratorConfig,
337 ready_tx: oneshot::Sender<Result<ReadyState, OrchestratorError>>,
338 shutdown_rx: watch::Receiver<bool>,
339 pump_drain_gate: PumpDrainGate,
340) {
341 if let Err(error) = orchestrator_lifecycle(config, ready_tx, shutdown_rx, pump_drain_gate).await
342 {
343 eprintln!("[harn] orchestrator harness error: {error}");
344 }
345}
346
347async fn orchestrator_lifecycle(
348 config: OrchestratorConfig,
349 ready_tx: oneshot::Sender<Result<ReadyState, OrchestratorError>>,
350 mut shutdown_rx: watch::Receiver<bool>,
351 pump_drain_gate: PumpDrainGate,
352) -> Result<(), OrchestratorError> {
353 harn_vm::reset_thread_local_state();
354
355 let shutdown_timeout = config.shutdown_timeout;
356 let drain_config = config.drain;
357 let pump_config = PumpConfig {
358 max_outstanding: config.pump.max_outstanding.max(1),
359 };
360
361 let state_dir = config.state_dir.clone();
362 std::fs::create_dir_all(&state_dir).map_err(|error| {
363 format!(
364 "failed to create state dir {}: {error}",
365 state_dir.display()
366 )
367 })?;
368
369 let observability = if let Some(log_format) = config.log_format {
370 Some(
371 harn_vm::observability::otel::ObservabilityGuard::install_orchestrator_subscriber(
372 harn_vm::observability::otel::OrchestratorObservabilityConfig {
373 log_format,
374 state_dir: Some(state_dir.clone()),
375 },
376 )?,
377 )
378 } else {
379 None
380 };
381
382 let config_path = absolutize_from_cwd(&config.manifest_path)?;
383 let (manifest, manifest_dir) = load_manifest(&config_path)?;
384 let drain_config = DrainConfig {
385 max_items: drain_config.max_items.max(1),
386 deadline: drain_config.deadline,
387 };
388 let pump_config = PumpConfig {
389 max_outstanding: pump_config.max_outstanding.max(1),
390 };
391
392 let startup_started_at = now_rfc3339()?;
393 let (admin_reload, mut reload_rx) = AdminReloadHandle::channel();
394
395 eprintln!("[harn] orchestrator manifest: {}", config_path.display());
396 if let Some(name) = manifest
397 .package
398 .as_ref()
399 .and_then(|package| package.name.as_deref())
400 {
401 eprintln!("[harn] orchestrator package: {name}");
402 }
403 eprintln!(
404 "[harn] orchestrator role: {} ({})",
405 config.role.as_str(),
406 config.role.registry_mode()
407 );
408 eprintln!("[harn] orchestrator state dir: {}", state_dir.display());
409 tracing::info!(
410 component = "orchestrator",
411 trace_id = "",
412 role = config.role.as_str(),
413 state_dir = %state_dir.display(),
414 manifest = %config_path.display(),
415 "orchestrator starting"
416 );
417
418 let workspace_root = manifest_dir.clone();
419 let mut vm = config
420 .role
421 .build_vm(&workspace_root, &manifest_dir, &state_dir)?;
422
423 let event_log = harn_vm::event_log::active_event_log()
424 .ok_or_else(|| "event log was not installed during VM initialization".to_string())?;
425 let event_log_description = event_log.describe();
426 let tenant_store = if config.role == OrchestratorRole::MultiTenant {
427 let store = harn_vm::TenantStore::load(&state_dir)?;
428 let active_tenants = store
429 .list()
430 .into_iter()
431 .filter(|tenant| tenant.status == harn_vm::TenantStatus::Active)
432 .collect::<Vec<_>>();
433 eprintln!(
434 "[harn] tenants loaded: {} active ({})",
435 active_tenants.len(),
436 active_tenants
437 .iter()
438 .map(|tenant| tenant.scope.id.0.as_str())
439 .collect::<Vec<_>>()
440 .join(", ")
441 );
442 Some(Arc::new(store))
443 } else {
444 None
445 };
446 eprintln!(
447 "[harn] event log: {} {}",
448 event_log_description.backend,
449 event_log_description
450 .location
451 .as_ref()
452 .map(|path| path.display().to_string())
453 .unwrap_or_else(|| "<memory>".to_string())
454 );
455
456 let secret_namespace = secret_namespace_for(&manifest_dir);
457 let secret_chain_display = configured_secret_chain_display();
458 let secret_chain = harn_vm::secrets::configured_default_chain(secret_namespace.clone())
459 .map_err(|error| format!("failed to configure secret providers: {error}"))?;
460 if secret_chain.providers().is_empty() {
461 return Err("secret provider chain resolved to zero providers"
462 .to_string()
463 .into());
464 }
465 eprintln!(
466 "[harn] secret providers: {} (namespace {})",
467 secret_chain_display, secret_namespace
468 );
469 let secret_provider: Arc<dyn harn_vm::secrets::SecretProvider> = Arc::new(secret_chain);
470
471 let extensions = package::load_runtime_extensions(&config_path);
472 let metrics_registry = Arc::new(harn_vm::MetricsRegistry::default());
473 harn_vm::install_active_metrics_registry(metrics_registry.clone());
474 let collected_triggers = package::collect_manifest_triggers(&mut vm, &extensions)
475 .await
476 .map_err(|error| format!("failed to collect manifest triggers: {error}"))?;
477 package::install_collected_manifest_triggers(&collected_triggers).await?;
478 apply_supervisor_state(&state_dir).await?;
479 eprintln!(
480 "[harn] registered triggers ({}): {}",
481 collected_triggers.len(),
482 format_trigger_summary(&collected_triggers)
483 );
484
485 let binding_versions = live_manifest_binding_versions();
486 let route_configs = build_route_configs(&collected_triggers, &binding_versions)?;
487 let mut connector_runtime = initialize_connectors(
488 &collected_triggers,
489 event_log.clone(),
490 secret_provider.clone(),
491 metrics_registry.clone(),
492 &extensions.provider_connectors,
493 config.clock.clone(),
494 )
495 .await?;
496 let route_configs = attach_route_connectors(
497 route_configs,
498 &connector_runtime.registry,
499 &extensions.provider_connectors,
500 )?;
501 let connector_clients = connector_runtime.registry.client_map().await;
502 harn_vm::install_active_connector_clients(connector_clients);
503 eprintln!(
504 "[harn] registered connectors ({}): {}",
505 connector_runtime.providers.len(),
506 connector_runtime.providers.join(", ")
507 );
508 eprintln!(
509 "[harn] activated connectors: {}",
510 format_activation_summary(&connector_runtime.activations)
511 );
512 let (mcp_router, mcp_service) = if config.mcp {
513 validate_mcp_paths(
514 &config.mcp_path,
515 &config.mcp_sse_path,
516 &config.mcp_messages_path,
517 )?;
518 if !has_orchestrator_api_keys_configured() && !has_mcp_oauth_configured() {
519 return Err(OrchestratorError::Serve(
520 "--mcp requires HARN_ORCHESTRATOR_API_KEYS or HARN_MCP_OAUTH_AUTHORIZATION_SERVERS so the embedded MCP management surface is authenticated"
521 .to_string(),
522 ));
523 }
524 let service = Arc::new(
525 crate::commands::mcp::serve::McpOrchestratorService::new_local(
526 crate::cli::OrchestratorLocalArgs {
527 config: config_path.clone(),
528 state_dir: state_dir.clone(),
529 },
530 )?,
531 );
532 let router = crate::commands::mcp::serve::http_router_for_service(
533 service.clone(),
534 config.mcp_path.clone(),
535 config.mcp_sse_path.clone(),
536 config.mcp_messages_path.clone(),
537 );
538 eprintln!(
539 "[harn] embedded MCP server mounted at {} (legacy SSE {}, messages {})",
540 config.mcp_path, config.mcp_sse_path, config.mcp_messages_path
541 );
542 (Some(router), Some(service))
543 } else {
544 (None, None)
545 };
546
547 let dispatcher = harn_vm::Dispatcher::with_event_log_and_metrics(
548 vm,
549 event_log.clone(),
550 Some(metrics_registry.clone()),
551 );
552 let mut pending_pumps = vec![(
553 PENDING_TOPIC.to_string(),
554 spawn_pending_pump(
555 event_log.clone(),
556 dispatcher.clone(),
557 pump_config,
558 metrics_registry.clone(),
559 pump_drain_gate.clone(),
560 PENDING_TOPIC,
561 )?,
562 )];
563 let mut inbox_pumps = vec![(
564 harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC.to_string(),
565 spawn_inbox_pump(
566 event_log.clone(),
567 dispatcher.clone(),
568 pump_config,
569 metrics_registry.clone(),
570 harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC,
571 )?,
572 )];
573 if let Some(store) = tenant_store.as_ref() {
574 for tenant in store
575 .list()
576 .into_iter()
577 .filter(|tenant| tenant.status == harn_vm::TenantStatus::Active)
578 {
579 let pending_topic = harn_vm::tenant_topic(
580 &tenant.scope.id,
581 &harn_vm::event_log::Topic::new(PENDING_TOPIC)
582 .map_err(|error| error.to_string())?,
583 )
584 .map_err(|error| error.to_string())?;
585 pending_pumps.push((
586 pending_topic.as_str().to_string(),
587 spawn_pending_pump(
588 event_log.clone(),
589 dispatcher.clone(),
590 pump_config,
591 metrics_registry.clone(),
592 pump_drain_gate.clone(),
593 pending_topic.as_str(),
594 )?,
595 ));
596 let inbox_topic = harn_vm::tenant_topic(
597 &tenant.scope.id,
598 &harn_vm::event_log::Topic::new(harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC)
599 .map_err(|error| error.to_string())?,
600 )
601 .map_err(|error| error.to_string())?;
602 inbox_pumps.push((
603 inbox_topic.as_str().to_string(),
604 spawn_inbox_pump(
605 event_log.clone(),
606 dispatcher.clone(),
607 pump_config,
608 metrics_registry.clone(),
609 inbox_topic.as_str(),
610 )?,
611 ));
612 }
613 }
614 let cron_pump = spawn_cron_pump(
615 event_log.clone(),
616 dispatcher.clone(),
617 pump_config,
618 metrics_registry.clone(),
619 pump_drain_gate.clone(),
620 )?;
621 let waitpoint_pump = spawn_waitpoint_resume_pump(
622 event_log.clone(),
623 dispatcher.clone(),
624 pump_config,
625 metrics_registry.clone(),
626 pump_drain_gate.clone(),
627 )?;
628 let waitpoint_cancel_pump = spawn_waitpoint_cancel_pump(
629 event_log.clone(),
630 dispatcher.clone(),
631 pump_config,
632 metrics_registry.clone(),
633 pump_drain_gate.clone(),
634 )?;
635 let waitpoint_sweeper = spawn_waitpoint_sweeper(dispatcher.clone());
636
637 let listener = ListenerRuntime::start(ListenerConfig {
638 bind: config.bind,
639 tls: config.tls.clone(),
640 event_log: event_log.clone(),
641 secrets: secret_provider.clone(),
642 allowed_origins: OriginAllowList::from_manifest(&manifest.orchestrator.allowed_origins),
643 max_body_bytes: ListenerConfig::max_body_bytes_or_default(
644 manifest.orchestrator.max_body_bytes,
645 ),
646 metrics_registry: metrics_registry.clone(),
647 admin_reload: Some(admin_reload.clone()),
648 mcp_router,
649 routes: route_configs,
650 tenant_store: tenant_store.clone(),
651 session_store: Some(Arc::new(harn_vm::SessionStore::new(event_log.clone()))),
652 })
653 .await?;
654 let local_bind = listener.local_addr();
655 let listener_metrics = listener.trigger_metrics();
656 let mut live_manifest = manifest;
657 let mut live_triggers = collected_triggers;
658 let _manifest_watcher = if config.watch_manifest {
659 Some(spawn_manifest_watcher(
660 config_path.clone(),
661 admin_reload.clone(),
662 )?)
663 } else {
664 None
665 };
666 connector_runtime.activations = connector_runtime
667 .registry
668 .activate_all(&connector_runtime.trigger_registry)
669 .await
670 .map_err(|error| error.to_string())?;
671 eprintln!(
672 "[harn] activated connectors: {}",
673 format_activation_summary(&connector_runtime.activations)
674 );
675
676 listener.mark_ready();
677 eprintln!("[harn] HTTP listener ready on {}", listener.url());
678 tracing::info!(
679 component = "orchestrator",
680 trace_id = "",
681 listener_url = %listener.url(),
682 "HTTP listener ready"
683 );
684
685 write_state_snapshot(
686 &state_dir.join(STATE_SNAPSHOT_FILE),
687 &ServeStateSnapshot {
688 status: "running".to_string(),
689 role: config.role.as_str().to_string(),
690 bind: local_bind.to_string(),
691 listener_url: listener.url(),
692 manifest_path: config_path.display().to_string(),
693 state_dir: state_dir.display().to_string(),
694 started_at: startup_started_at.clone(),
695 stopped_at: None,
696 secret_provider_chain: secret_chain_display.clone(),
697 event_log_backend: event_log_description.backend.to_string(),
698 event_log_location: event_log_description
699 .location
700 .as_ref()
701 .map(|path| path.display().to_string()),
702 triggers: trigger_state_snapshots(&live_triggers, &listener_metrics),
703 connectors: connector_runtime.providers.clone(),
704 activations: connector_runtime
705 .activations
706 .iter()
707 .map(|activation| ConnectorActivationSnapshot {
708 provider: activation.provider.as_str().to_string(),
709 binding_count: activation.binding_count,
710 })
711 .collect(),
712 },
713 )?;
714
715 append_lifecycle_event(
716 &event_log,
717 "startup",
718 json!({
719 "bind": local_bind.to_string(),
720 "manifest": config_path.display().to_string(),
721 "role": config.role.as_str(),
722 "state_dir": state_dir.display().to_string(),
723 "trigger_count": live_triggers.len(),
724 "connector_count": connector_runtime.providers.len(),
725 "tls_enabled": listener.scheme() == "https",
726 "shutdown_timeout_secs": shutdown_timeout.as_secs(),
727 "drain_max_items": drain_config.max_items,
728 "drain_deadline_secs": drain_config.deadline.as_secs(),
729 "pump_max_outstanding": pump_config.max_outstanding,
730 }),
731 )
732 .await?;
733
734 let stranded = stranded_envelopes(&event_log, Duration::ZERO).await?;
735 if !stranded.is_empty() {
736 eprintln!(
737 "[harn] startup found {} stranded inbox envelope(s); inspect with `harn orchestrator queue` and recover explicitly with `harn orchestrator recover --dry-run --envelope-age ...`",
738 stranded.len()
739 );
740 }
741 append_lifecycle_event(
742 &event_log,
743 "startup_stranded_envelopes",
744 json!({
745 "count": stranded.len(),
746 }),
747 )
748 .await?;
749
750 let _ = ready_tx.send(Ok(ReadyState {
752 event_log: event_log.clone(),
753 listener_url: listener.url(),
754 local_addr: local_bind,
755 state_dir: state_dir.clone(),
756 admin_reload: admin_reload.clone(),
757 }));
758
759 let mut ctx = RuntimeCtx {
761 role: config.role,
762 config_path: &config_path,
763 state_dir: &state_dir,
764 bind: local_bind,
765 startup_started_at: &startup_started_at,
766 event_log: &event_log,
767 event_log_description: &event_log_description,
768 secret_chain_display: &secret_chain_display,
769 listener: &listener,
770 connectors: &mut connector_runtime,
771 live_manifest: &mut live_manifest,
772 live_triggers: &mut live_triggers,
773 secret_provider: &secret_provider,
774 metrics_registry: &metrics_registry,
775 mcp_service: mcp_service.as_ref(),
776 clock: config.clock.clone(),
777 reload_rx: &mut reload_rx,
778 };
779
780 loop {
781 tokio::select! {
782 changed = shutdown_rx.changed() => {
783 if changed.is_err() || *shutdown_rx.borrow() {
784 break;
785 }
786 }
787 Some(request) = ctx.reload_rx.recv() => {
788 handle_reload_request(&mut ctx, request).await?;
789 }
790 }
791 }
792
793 listener.mark_not_ready();
794 let shutdown = graceful_shutdown(
795 GracefulShutdownCtx {
796 role: config.role,
797 bind: local_bind,
798 listener_url: listener.url(),
799 config_path: &config_path,
800 state_dir: &state_dir,
801 startup_started_at: &startup_started_at,
802 event_log: &event_log,
803 event_log_description: &event_log_description,
804 secret_chain_display: &secret_chain_display,
805 triggers: &live_triggers,
806 connectors: &connector_runtime,
807 shutdown_timeout,
808 drain_config,
809 },
810 listener,
811 dispatcher,
812 pending_pumps,
813 cron_pump,
814 inbox_pumps,
815 waitpoint_pump,
816 waitpoint_cancel_pump,
817 waitpoint_sweeper,
818 )
819 .await;
820
821 if let Some(obs) = observability {
822 if let Err(error) = obs.shutdown() {
823 if shutdown.is_ok() {
824 return Err(OrchestratorError::Serve(error));
825 }
826 eprintln!("[harn] observability shutdown warning: {error}");
827 }
828 }
829 harn_vm::clear_active_metrics_registry();
830 shutdown
831}
832
833struct ConnectorRuntime {
836 registry: harn_vm::ConnectorRegistry,
837 trigger_registry: harn_vm::TriggerRegistry,
838 handles: Vec<harn_vm::connectors::ConnectorHandle>,
839 providers: Vec<String>,
840 activations: Vec<harn_vm::ActivationHandle>,
841 #[cfg_attr(not(unix), allow(dead_code))]
842 provider_overrides: Vec<ResolvedProviderConnectorConfig>,
843}
844
845#[cfg_attr(not(unix), allow(dead_code))]
846#[derive(Clone, Debug, Default, Serialize)]
847struct ManifestReloadSummary {
848 added: Vec<String>,
849 modified: Vec<String>,
850 removed: Vec<String>,
851 unchanged: Vec<String>,
852}
853
854#[derive(Clone, Copy, Debug, PartialEq, Eq)]
855enum PumpMode {
856 Running,
857 Draining(PumpDrainRequest),
858}
859
860#[derive(Clone, Copy, Debug, PartialEq, Eq)]
861struct PumpDrainRequest {
862 up_to: u64,
863 config: DrainConfig,
864 deadline: tokio::time::Instant,
865}
866
867#[derive(Clone, Copy, Debug, PartialEq, Eq)]
868enum PumpDrainStopReason {
869 Drained,
870 MaxItems,
871 Deadline,
872 Error,
873}
874
875impl PumpDrainStopReason {
876 fn as_str(self) -> &'static str {
877 match self {
878 Self::Drained => "drained",
879 Self::MaxItems => "max_items",
880 Self::Deadline => "deadline",
881 Self::Error => "error",
882 }
883 }
884}
885
886#[derive(Clone, Copy, Debug, Default)]
887struct PumpStats {
888 last_seen: u64,
889 processed: u64,
890}
891
892#[derive(Clone, Copy, Debug)]
893struct PumpDrainProgress {
894 request: PumpDrainRequest,
895 start_seen: u64,
896}
897
898#[derive(Clone, Copy, Debug)]
899struct PumpDrainReport {
900 stats: PumpStats,
901 drain_items: u64,
902 remaining_queued: u64,
903 outstanding_tasks: usize,
904 stop_reason: PumpDrainStopReason,
905}
906
907impl PumpDrainReport {
908 fn truncated(self) -> bool {
909 self.remaining_queued > 0 || self.outstanding_tasks > 0
910 }
911}
912
913struct PumpHandle {
914 mode_tx: watch::Sender<PumpMode>,
915 join: tokio::task::JoinHandle<Result<PumpDrainReport, OrchestratorError>>,
916}
917
918impl PumpHandle {
919 async fn drain(
920 self,
921 log: &Arc<AnyEventLog>,
922 topic_name: &str,
923 up_to: u64,
924 config: DrainConfig,
925 overall_deadline: tokio::time::Instant,
926 ) -> Result<PumpDrainReport, OrchestratorError> {
927 let drain_deadline = std::cmp::min(
928 tokio::time::Instant::now() + config.deadline,
929 overall_deadline,
930 );
931 let _ = self.mode_tx.send(PumpMode::Draining(PumpDrainRequest {
932 up_to,
933 config,
934 deadline: drain_deadline,
935 }));
936 append_pump_lifecycle_event(
937 log,
938 "pump_drain_started",
939 json!({
940 "topic": topic_name,
941 "up_to": up_to,
942 "max_items": config.max_items,
943 "drain_deadline_ms": config.deadline.as_millis(),
944 }),
945 )
946 .await?;
947 match self.join.await {
948 Ok(result) => result,
949 Err(error) => Err(format!("pump task join failed: {error}").into()),
950 }
951 }
952}
953
954struct WaitpointSweepHandle {
955 stop_tx: watch::Sender<bool>,
956 join: tokio::task::JoinHandle<Result<(), OrchestratorError>>,
957}
958
959impl WaitpointSweepHandle {
960 async fn shutdown(self) -> Result<(), OrchestratorError> {
961 let _ = self.stop_tx.send(true);
962 match self.join.await {
963 Ok(result) => result,
964 Err(error) => Err(format!("waitpoint sweeper join failed: {error}").into()),
965 }
966 }
967}
968
969#[derive(Debug, Deserialize)]
970struct PendingTriggerRecord {
971 trigger_id: String,
972 binding_version: u32,
973 event: harn_vm::TriggerEvent,
974}
975
976fn spawn_pending_pump(
979 event_log: Arc<harn_vm::event_log::AnyEventLog>,
980 dispatcher: harn_vm::Dispatcher,
981 pump_config: PumpConfig,
982 metrics_registry: Arc<harn_vm::MetricsRegistry>,
983 pump_drain_gate: PumpDrainGate,
984 topic_name: &str,
985) -> Result<PumpHandle, OrchestratorError> {
986 let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
987 spawn_topic_pump(
988 event_log,
989 topic,
990 pump_config,
991 metrics_registry,
992 pump_drain_gate,
993 move |logged| {
994 let dispatcher = dispatcher.clone();
995 async move {
996 if pending_pump_test_should_fail() {
997 return Err("test pending pump failure".to_string().into());
998 }
999 if logged.kind != "trigger_event" {
1000 return Ok(false);
1001 }
1002 let record: PendingTriggerRecord = serde_json::from_value(logged.payload)
1003 .map_err(|error| format!("failed to decode pending trigger event: {error}"))?;
1004 dispatcher
1005 .enqueue_targeted_with_headers(
1006 Some(record.trigger_id),
1007 Some(record.binding_version),
1008 record.event,
1009 Some(&logged.headers),
1010 )
1011 .await
1012 .map_err(|error| format!("failed to enqueue pending trigger event: {error}"))?;
1013 Ok(true)
1014 }
1015 },
1016 )
1017}
1018
1019fn spawn_cron_pump(
1020 event_log: Arc<harn_vm::event_log::AnyEventLog>,
1021 dispatcher: harn_vm::Dispatcher,
1022 pump_config: PumpConfig,
1023 metrics_registry: Arc<harn_vm::MetricsRegistry>,
1024 pump_drain_gate: PumpDrainGate,
1025) -> Result<PumpHandle, OrchestratorError> {
1026 let topic =
1027 harn_vm::event_log::Topic::new(CRON_TICK_TOPIC).map_err(|error| error.to_string())?;
1028 spawn_topic_pump(
1029 event_log,
1030 topic,
1031 pump_config,
1032 metrics_registry,
1033 pump_drain_gate,
1034 move |logged| {
1035 let dispatcher = dispatcher.clone();
1036 async move {
1037 if logged.kind != "trigger_event" {
1038 return Ok(false);
1039 }
1040 let event: harn_vm::TriggerEvent = serde_json::from_value(logged.payload)
1041 .map_err(|error| format!("failed to decode cron trigger event: {error}"))?;
1042 let trigger_id = match &event.provider_payload {
1043 harn_vm::ProviderPayload::Known(
1044 harn_vm::triggers::event::KnownProviderPayload::Cron(payload),
1045 ) => payload.cron_id.clone(),
1046 _ => None,
1047 };
1048 dispatcher
1049 .enqueue_targeted_with_headers(trigger_id, None, event, Some(&logged.headers))
1050 .await
1051 .map_err(|error| format!("failed to enqueue cron trigger event: {error}"))?;
1052 Ok(true)
1053 }
1054 },
1055 )
1056}
1057
1058fn spawn_inbox_pump(
1059 event_log: Arc<harn_vm::event_log::AnyEventLog>,
1060 dispatcher: harn_vm::Dispatcher,
1061 pump_config: PumpConfig,
1062 metrics_registry: Arc<harn_vm::MetricsRegistry>,
1063 topic_name: &str,
1064) -> Result<PumpHandle, OrchestratorError> {
1065 let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
1066 let consumer = pump_consumer_id(&topic)?;
1067 let inbox_task_release_file = inbox_task_test_release_file();
1068 let (mode_tx, mut mode_rx) = watch::channel(PumpMode::Running);
1069 let join = tokio::task::spawn_local(async move {
1070 let start_from = event_log
1071 .consumer_cursor(&topic, &consumer)
1072 .await
1073 .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
1074 .or(event_log
1075 .latest(&topic)
1076 .await
1077 .map_err(|error| format!("failed to read topic head {topic}: {error}"))?);
1078 let mut stream = event_log
1079 .clone()
1080 .subscribe(&topic, start_from)
1081 .await
1082 .map_err(|error| format!("failed to subscribe topic {topic}: {error}"))?;
1083 let mut stats = PumpStats {
1084 last_seen: start_from.unwrap_or(0),
1085 processed: 0,
1086 };
1087 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1088 let mut drain_progress = None;
1089 let mut tasks = JoinSet::new();
1090
1091 loop {
1092 if let Some(progress) = drain_progress {
1093 if let Some(report) = maybe_finish_pump_drain(stats, progress, tasks.len()) {
1094 return Ok(report);
1095 }
1096 }
1097
1098 let deadline = drain_progress.map(|progress| progress.request.deadline);
1099 let mut deadline_wait = Box::pin(async move {
1100 if let Some(deadline) = deadline {
1101 tokio::time::sleep_until(deadline).await;
1102 } else {
1103 std::future::pending::<()>().await;
1104 }
1105 });
1106
1107 tokio::select! {
1108 changed = mode_rx.changed() => {
1109 if changed.is_err() {
1110 break;
1111 }
1112 if let PumpMode::Draining(request) = *mode_rx.borrow() {
1113 drain_progress.get_or_insert(PumpDrainProgress {
1114 request,
1115 start_seen: stats.last_seen,
1116 });
1117 }
1118 }
1119 _ = &mut deadline_wait => {
1120 if let Some(progress) = drain_progress {
1121 return Ok(pump_drain_report(
1122 stats,
1123 progress.start_seen,
1124 progress.request.up_to,
1125 tasks.len(),
1126 PumpDrainStopReason::Deadline,
1127 ));
1128 }
1129 }
1130 joined = tasks.join_next(), if !tasks.is_empty() => {
1131 match joined {
1132 Some(Ok(())) => {
1133 record_pump_metrics(
1134 &metrics_registry,
1135 &event_log,
1136 &topic,
1137 stats.last_seen,
1138 tasks.len(),
1139 )
1140 .await?;
1141 }
1142 Some(Err(error)) => {
1143 return Err(format!("inbox dispatch task join failed: {error}").into());
1144 }
1145 None => {}
1146 }
1147 }
1148 _ = tokio::time::sleep(Duration::from_millis(25)), if tasks.len() >= pump_config.max_outstanding => {
1149 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1150 }
1151 received = stream.next(), if tasks.len() < pump_config.max_outstanding => {
1152 let Some(received) = received else {
1153 break;
1154 };
1155 let (event_id, logged) = received
1156 .map_err(|error| format!("topic pump read failed for {topic}: {error}"))?;
1157 if logged.kind != "event_ingested" {
1158 stats.last_seen = event_id;
1159 event_log
1160 .ack(&topic, &consumer, event_id)
1161 .await
1162 .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1163 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1164 continue;
1165 }
1166 append_pump_lifecycle_event(
1167 &event_log,
1168 "pump_received",
1169 json!({
1170 "topic": topic.as_str(),
1171 "event_log_id": event_id,
1172 "outstanding": tasks.len(),
1173 "max_outstanding": pump_config.max_outstanding,
1174 }),
1175 )
1176 .await?;
1177 let envelope: harn_vm::triggers::dispatcher::InboxEnvelope =
1178 serde_json::from_value(logged.payload)
1179 .map_err(|error| format!("failed to decode dispatcher inbox event: {error}"))?;
1180 let trigger_id = envelope.trigger_id.clone();
1181 let binding_version = envelope.binding_version;
1182 let trigger_event_id = envelope.event.id.0.clone();
1183 let parent_headers = logged.headers.clone();
1184 append_pump_lifecycle_event(
1185 &event_log,
1186 "pump_eligible",
1187 json!({
1188 "topic": topic.as_str(),
1189 "event_log_id": event_id,
1190 "trigger_id": trigger_id.clone(),
1191 "binding_version": binding_version,
1192 "trigger_event_id": trigger_event_id,
1193 }),
1194 )
1195 .await?;
1196 metrics_registry.record_orchestrator_pump_admission_delay(
1197 topic.as_str(),
1198 admission_delay(logged.occurred_at_ms),
1199 );
1200 append_pump_lifecycle_event(
1201 &event_log,
1202 "pump_admitted",
1203 json!({
1204 "topic": topic.as_str(),
1205 "event_log_id": event_id,
1206 "outstanding_after_admit": tasks.len() + 1,
1207 "max_outstanding": pump_config.max_outstanding,
1208 "trigger_id": trigger_id.clone(),
1209 "binding_version": binding_version,
1210 "trigger_event_id": trigger_event_id,
1211 }),
1212 )
1213 .await?;
1214 let dispatcher = dispatcher.clone();
1215 let task_event_log = event_log.clone();
1216 let task_topic = topic.as_str().to_string();
1217 let inbox_task_release_file = inbox_task_release_file.clone();
1218 tasks.spawn_local(async move {
1219 if let Some(path) = inbox_task_release_file.as_ref() {
1220 wait_for_test_release_file(path).await;
1221 }
1222 let _ = append_pump_lifecycle_event(
1223 &task_event_log,
1224 "pump_dispatch_started",
1225 json!({
1226 "topic": task_topic.clone(),
1227 "event_log_id": event_id,
1228 "trigger_id": trigger_id,
1229 "binding_version": binding_version,
1230 "trigger_event_id": trigger_event_id,
1231 }),
1232 )
1233 .await;
1234 let result = dispatcher
1235 .dispatch_inbox_envelope_with_parent_headers(
1236 envelope,
1237 &parent_headers,
1238 )
1239 .await;
1240 let (status, error_message) = match result {
1241 Ok(_) => ("completed", None),
1242 Err(error) => {
1243 let message = error.to_string();
1244 eprintln!("[harn] inbox dispatch warning: {message}");
1245 ("failed", Some(message))
1246 }
1247 };
1248 let _ = append_pump_lifecycle_event(
1249 &task_event_log,
1250 "pump_dispatch_completed",
1251 json!({
1252 "topic": task_topic,
1253 "event_log_id": event_id,
1254 "status": status,
1255 "error": error_message,
1256 }),
1257 )
1258 .await;
1259 });
1260 stats.last_seen = event_id;
1261 stats.processed += 1;
1262 event_log
1263 .ack(&topic, &consumer, event_id)
1264 .await
1265 .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1266 append_pump_lifecycle_event(
1267 &event_log,
1268 "pump_acked",
1269 json!({
1270 "topic": topic.as_str(),
1271 "event_log_id": event_id,
1272 "cursor": event_id,
1273 }),
1274 )
1275 .await?;
1276 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1277 }
1278 }
1279 }
1280
1281 while let Some(joined) = tasks.join_next().await {
1282 joined.map_err(|error| format!("inbox dispatch task join failed: {error}"))?;
1283 record_pump_metrics(
1284 &metrics_registry,
1285 &event_log,
1286 &topic,
1287 stats.last_seen,
1288 tasks.len(),
1289 )
1290 .await?;
1291 }
1292
1293 Ok(drain_progress
1294 .map(|progress| {
1295 pump_drain_report(
1296 stats,
1297 progress.start_seen,
1298 progress.request.up_to,
1299 0,
1300 PumpDrainStopReason::Drained,
1301 )
1302 })
1303 .unwrap_or_else(|| PumpDrainReport {
1304 stats,
1305 drain_items: 0,
1306 remaining_queued: 0,
1307 outstanding_tasks: 0,
1308 stop_reason: PumpDrainStopReason::Drained,
1309 }))
1310 });
1311 Ok(PumpHandle { mode_tx, join })
1312}
1313
1314fn spawn_waitpoint_resume_pump(
1315 event_log: Arc<harn_vm::event_log::AnyEventLog>,
1316 dispatcher: harn_vm::Dispatcher,
1317 pump_config: PumpConfig,
1318 metrics_registry: Arc<harn_vm::MetricsRegistry>,
1319 pump_drain_gate: PumpDrainGate,
1320) -> Result<PumpHandle, OrchestratorError> {
1321 let topic = harn_vm::event_log::Topic::new(harn_vm::WAITPOINT_RESUME_TOPIC)
1322 .map_err(|error| error.to_string())?;
1323 spawn_topic_pump(
1324 event_log,
1325 topic,
1326 pump_config,
1327 metrics_registry,
1328 pump_drain_gate,
1329 move |logged| {
1330 let dispatcher = dispatcher.clone();
1331 async move {
1332 harn_vm::process_waitpoint_resume_event(&dispatcher, logged)
1333 .await
1334 .map_err(OrchestratorError::from)
1335 }
1336 },
1337 )
1338}
1339
1340fn spawn_waitpoint_cancel_pump(
1341 event_log: Arc<harn_vm::event_log::AnyEventLog>,
1342 dispatcher: harn_vm::Dispatcher,
1343 pump_config: PumpConfig,
1344 metrics_registry: Arc<harn_vm::MetricsRegistry>,
1345 pump_drain_gate: PumpDrainGate,
1346) -> Result<PumpHandle, OrchestratorError> {
1347 let topic = harn_vm::event_log::Topic::new(harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC)
1348 .map_err(|error| error.to_string())?;
1349 spawn_topic_pump(
1350 event_log,
1351 topic,
1352 pump_config,
1353 metrics_registry,
1354 pump_drain_gate,
1355 move |logged| {
1356 let dispatcher = dispatcher.clone();
1357 async move {
1358 if logged.kind != "dispatch_cancel_requested" {
1359 return Ok(false);
1360 }
1361 harn_vm::service_waitpoints_once(&dispatcher, None)
1362 .await
1363 .map_err(|error| {
1364 OrchestratorError::Serve(format!(
1365 "failed to service waitpoints on cancel: {error}"
1366 ))
1367 })?;
1368 Ok(true)
1369 }
1370 },
1371 )
1372}
1373
1374fn spawn_waitpoint_sweeper(dispatcher: harn_vm::Dispatcher) -> WaitpointSweepHandle {
1375 let (stop_tx, mut stop_rx) = watch::channel(false);
1376 let join = tokio::task::spawn_local(async move {
1377 let mut interval = tokio::time::interval(WAITPOINT_SERVICE_INTERVAL);
1378 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1379 loop {
1380 tokio::select! {
1381 changed = stop_rx.changed() => {
1382 if changed.is_err() || *stop_rx.borrow() {
1383 break;
1384 }
1385 }
1386 _ = interval.tick() => {
1387 harn_vm::service_waitpoints_once(&dispatcher, None)
1388 .await
1389 .map_err(|error| format!("failed to service waitpoints on sweep: {error}"))?;
1390 }
1391 }
1392 }
1393 Ok(())
1394 });
1395 WaitpointSweepHandle { stop_tx, join }
1396}
1397
1398fn spawn_topic_pump<F, Fut>(
1399 event_log: Arc<harn_vm::event_log::AnyEventLog>,
1400 topic: harn_vm::event_log::Topic,
1401 _pump_config: PumpConfig,
1402 metrics_registry: Arc<harn_vm::MetricsRegistry>,
1403 pump_drain_gate: PumpDrainGate,
1404 process: F,
1405) -> Result<PumpHandle, OrchestratorError>
1406where
1407 F: Fn(harn_vm::event_log::LogEvent) -> Fut + 'static,
1408 Fut: std::future::Future<Output = Result<bool, OrchestratorError>> + 'static,
1409{
1410 let consumer = pump_consumer_id(&topic)?;
1411 let mut pump_drain_gate_rx = pump_drain_gate.subscribe();
1412 let (mode_tx, mut mode_rx) = watch::channel(PumpMode::Running);
1413 let join = tokio::task::spawn_local(async move {
1414 let start_from = event_log
1415 .consumer_cursor(&topic, &consumer)
1416 .await
1417 .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
1418 .or(event_log
1419 .latest(&topic)
1420 .await
1421 .map_err(|error| format!("failed to read topic head {topic}: {error}"))?);
1422 let mut stream = event_log
1423 .clone()
1424 .subscribe(&topic, start_from)
1425 .await
1426 .map_err(|error| format!("failed to subscribe topic {topic}: {error}"))?;
1427 let mut stats = PumpStats {
1428 last_seen: start_from.unwrap_or(0),
1429 processed: 0,
1430 };
1431 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1432 let mut drain_progress = None;
1433 loop {
1434 if let Some(progress) = drain_progress {
1435 if let Some(report) = maybe_finish_pump_drain(stats, progress, 0) {
1436 return Ok(report);
1437 }
1438 }
1439 let deadline = drain_progress.map(|progress| progress.request.deadline);
1440 let mut deadline_wait = Box::pin(async move {
1441 if let Some(deadline) = deadline {
1442 tokio::time::sleep_until(deadline).await;
1443 } else {
1444 std::future::pending::<()>().await;
1445 }
1446 });
1447 tokio::select! {
1448 changed = mode_rx.changed() => {
1449 if changed.is_err() {
1450 break;
1451 }
1452 if let PumpMode::Draining(request) = *mode_rx.borrow() {
1453 drain_progress.get_or_insert(PumpDrainProgress {
1454 request,
1455 start_seen: stats.last_seen,
1456 });
1457 }
1458 }
1459 _ = &mut deadline_wait => {
1460 if let Some(progress) = drain_progress {
1461 return Ok(pump_drain_report(
1462 stats,
1463 progress.start_seen,
1464 progress.request.up_to,
1465 0,
1466 PumpDrainStopReason::Deadline,
1467 ));
1468 }
1469 }
1470 received = stream.next() => {
1471 let Some(received) = received else {
1472 break;
1473 };
1474 let (event_id, logged) = received
1475 .map_err(|error| format!("topic pump read failed for {topic}: {error}"))?;
1476 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 1).await?;
1477 metrics_registry.record_orchestrator_pump_admission_delay(
1478 topic.as_str(),
1479 admission_delay(logged.occurred_at_ms),
1480 );
1481 wait_for_pump_drain_release(
1482 &event_log,
1483 &topic,
1484 event_id,
1485 &mut pump_drain_gate_rx,
1486 )
1487 .await?;
1488 let handled = process(logged).await?;
1489 stats.last_seen = event_id;
1490 if handled {
1491 stats.processed += 1;
1492 }
1493 event_log
1494 .ack(&topic, &consumer, event_id)
1495 .await
1496 .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1497 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1498 }
1499 }
1500 }
1501 Ok(drain_progress
1502 .map(|progress| {
1503 pump_drain_report(
1504 stats,
1505 progress.start_seen,
1506 progress.request.up_to,
1507 0,
1508 PumpDrainStopReason::Drained,
1509 )
1510 })
1511 .unwrap_or_else(|| PumpDrainReport {
1512 stats,
1513 drain_items: 0,
1514 remaining_queued: 0,
1515 outstanding_tasks: 0,
1516 stop_reason: PumpDrainStopReason::Drained,
1517 }))
1518 });
1519 Ok(PumpHandle { mode_tx, join })
1520}
1521
1522#[allow(clippy::too_many_arguments)]
1525async fn graceful_shutdown(
1526 ctx: GracefulShutdownCtx<'_>,
1527 listener: ListenerRuntime,
1528 dispatcher: harn_vm::Dispatcher,
1529 pending_pumps: Vec<(String, PumpHandle)>,
1530 cron_pump: PumpHandle,
1531 inbox_pumps: Vec<(String, PumpHandle)>,
1532 waitpoint_pump: PumpHandle,
1533 waitpoint_cancel_pump: PumpHandle,
1534 waitpoint_sweeper: WaitpointSweepHandle,
1535) -> Result<(), OrchestratorError> {
1536 eprintln!("[harn] signal received, starting graceful shutdown...");
1537 tracing::info!(
1538 component = "orchestrator",
1539 trace_id = "",
1540 shutdown_timeout_secs = ctx.shutdown_timeout.as_secs(),
1541 "signal received, starting graceful shutdown"
1542 );
1543 let listener_in_flight = listener
1544 .trigger_metrics()
1545 .into_values()
1546 .map(|metrics| metrics.in_flight)
1547 .sum::<u64>();
1548 let dispatcher_before = dispatcher.snapshot();
1549 append_lifecycle_event(
1550 ctx.event_log,
1551 "draining",
1552 json!({
1553 "bind": ctx.bind.to_string(),
1554 "role": ctx.role.as_str(),
1555 "status": "draining",
1556 "http_in_flight": listener_in_flight,
1557 "dispatcher_in_flight": dispatcher_before.in_flight,
1558 "dispatcher_retry_queue_depth": dispatcher_before.retry_queue_depth,
1559 "dispatcher_dlq_depth": dispatcher_before.dlq_depth,
1560 "shutdown_timeout_secs": ctx.shutdown_timeout.as_secs(),
1561 "drain_max_items": ctx.drain_config.max_items,
1562 "drain_deadline_secs": ctx.drain_config.deadline.as_secs(),
1563 }),
1564 )
1565 .await?;
1566
1567 let deadline = tokio::time::Instant::now() + ctx.shutdown_timeout;
1568 let listener_metrics = listener.shutdown(remaining_budget(deadline)).await?;
1569 for handle in &ctx.connectors.handles {
1570 let connector = handle.lock().await;
1571 if let Err(error) = connector.shutdown(remaining_budget(deadline)).await {
1572 eprintln!(
1573 "[harn] connector {} shutdown warning: {error}",
1574 connector.provider_id().as_str()
1575 );
1576 }
1577 }
1578
1579 let mut pending_processed = 0;
1580 for (topic_name, pump) in pending_pumps {
1581 let stats =
1582 drain_pump_best_effort(ctx.event_log, &topic_name, pump, ctx.drain_config, deadline)
1583 .await?;
1584 pending_processed += stats.stats.processed;
1585 emit_drain_truncated(ctx.event_log, &topic_name, stats, ctx.drain_config).await?;
1586 }
1587 let cron_stats = drain_pump_best_effort(
1588 ctx.event_log,
1589 CRON_TICK_TOPIC,
1590 cron_pump,
1591 ctx.drain_config,
1592 deadline,
1593 )
1594 .await?;
1595 emit_drain_truncated(ctx.event_log, CRON_TICK_TOPIC, cron_stats, ctx.drain_config).await?;
1596 let mut inbox_processed = 0;
1597 for (topic_name, pump) in inbox_pumps {
1598 let stats =
1599 drain_pump_best_effort(ctx.event_log, &topic_name, pump, ctx.drain_config, deadline)
1600 .await?;
1601 inbox_processed += stats.stats.processed;
1602 emit_drain_truncated(ctx.event_log, &topic_name, stats, ctx.drain_config).await?;
1603 }
1604 let waitpoint_stats = waitpoint_pump
1605 .drain(
1606 ctx.event_log,
1607 harn_vm::WAITPOINT_RESUME_TOPIC,
1608 topic_latest_id(ctx.event_log, harn_vm::WAITPOINT_RESUME_TOPIC).await?,
1609 ctx.drain_config,
1610 deadline,
1611 )
1612 .await?;
1613 emit_drain_truncated(
1614 ctx.event_log,
1615 harn_vm::WAITPOINT_RESUME_TOPIC,
1616 waitpoint_stats,
1617 ctx.drain_config,
1618 )
1619 .await?;
1620 let waitpoint_cancel_stats = waitpoint_cancel_pump
1621 .drain(
1622 ctx.event_log,
1623 harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC,
1624 topic_latest_id(ctx.event_log, harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC).await?,
1625 ctx.drain_config,
1626 deadline,
1627 )
1628 .await?;
1629 emit_drain_truncated(
1630 ctx.event_log,
1631 harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC,
1632 waitpoint_cancel_stats,
1633 ctx.drain_config,
1634 )
1635 .await?;
1636 waitpoint_sweeper.shutdown().await?;
1637 let drain_report = dispatcher
1638 .drain(remaining_budget(deadline))
1639 .await
1640 .map_err(|error| format!("failed to drain dispatcher: {error}"))?;
1641
1642 let stopped_at = now_rfc3339()?;
1643 let timed_out = !drain_report.drained;
1644 if timed_out {
1645 dispatcher.shutdown();
1646 }
1647 append_lifecycle_event(
1648 ctx.event_log,
1649 "stopped",
1650 json!({
1651 "bind": ctx.bind.to_string(),
1652 "role": ctx.role.as_str(),
1653 "status": "stopped",
1654 "http_in_flight": listener_in_flight,
1655 "dispatcher_in_flight": drain_report.in_flight,
1656 "dispatcher_retry_queue_depth": drain_report.retry_queue_depth,
1657 "dispatcher_dlq_depth": drain_report.dlq_depth,
1658 "pending_events_drained": pending_processed,
1659 "cron_events_drained": cron_stats.stats.processed,
1660 "inbox_events_drained": inbox_processed,
1661 "waitpoint_events_drained": waitpoint_stats.stats.processed,
1662 "waitpoint_cancel_events_drained": waitpoint_cancel_stats.stats.processed,
1663 "timed_out": timed_out,
1664 }),
1665 )
1666 .await?;
1667 ctx.event_log
1668 .flush()
1669 .await
1670 .map_err(|error| format!("failed to flush event log: {error}"))?;
1671
1672 write_state_snapshot(
1673 &ctx.state_dir.join(STATE_SNAPSHOT_FILE),
1674 &ServeStateSnapshot {
1675 status: "stopped".to_string(),
1676 role: ctx.role.as_str().to_string(),
1677 bind: ctx.bind.to_string(),
1678 listener_url: ctx.listener_url.clone(),
1679 manifest_path: ctx.config_path.display().to_string(),
1680 state_dir: ctx.state_dir.display().to_string(),
1681 started_at: ctx.startup_started_at.to_string(),
1682 stopped_at: Some(stopped_at),
1683 secret_provider_chain: ctx.secret_chain_display.to_string(),
1684 event_log_backend: ctx.event_log_description.backend.to_string(),
1685 event_log_location: ctx
1686 .event_log_description
1687 .location
1688 .as_ref()
1689 .map(|path| path.display().to_string()),
1690 triggers: trigger_state_snapshots(ctx.triggers, &listener_metrics),
1691 connectors: ctx.connectors.providers.clone(),
1692 activations: ctx
1693 .connectors
1694 .activations
1695 .iter()
1696 .map(|activation| ConnectorActivationSnapshot {
1697 provider: activation.provider.as_str().to_string(),
1698 binding_count: activation.binding_count,
1699 })
1700 .collect(),
1701 },
1702 )?;
1703
1704 if timed_out {
1705 eprintln!(
1706 "[harn] graceful shutdown timed out with {} dispatches and {} retry waits remaining",
1707 drain_report.in_flight, drain_report.retry_queue_depth
1708 );
1709 }
1710 eprintln!("[harn] graceful shutdown complete");
1711 tracing::info!(
1712 component = "orchestrator",
1713 trace_id = "",
1714 "graceful shutdown complete"
1715 );
1716 Ok(())
1717}
1718
1719struct GracefulShutdownCtx<'a> {
1720 role: OrchestratorRole,
1721 bind: SocketAddr,
1722 listener_url: String,
1723 config_path: &'a Path,
1724 state_dir: &'a Path,
1725 startup_started_at: &'a str,
1726 event_log: &'a Arc<AnyEventLog>,
1727 event_log_description: &'a harn_vm::event_log::EventLogDescription,
1728 secret_chain_display: &'a str,
1729 triggers: &'a [CollectedManifestTrigger],
1730 connectors: &'a ConnectorRuntime,
1731 shutdown_timeout: Duration,
1732 drain_config: DrainConfig,
1733}
1734
1735struct RuntimeCtx<'a> {
1738 role: OrchestratorRole,
1739 config_path: &'a Path,
1740 state_dir: &'a Path,
1741 bind: SocketAddr,
1742 startup_started_at: &'a str,
1743 event_log: &'a Arc<AnyEventLog>,
1744 event_log_description: &'a harn_vm::event_log::EventLogDescription,
1745 secret_chain_display: &'a str,
1746 listener: &'a ListenerRuntime,
1747 connectors: &'a mut ConnectorRuntime,
1748 live_manifest: &'a mut Manifest,
1749 live_triggers: &'a mut Vec<CollectedManifestTrigger>,
1750 secret_provider: &'a Arc<dyn harn_vm::secrets::SecretProvider>,
1751 metrics_registry: &'a Arc<harn_vm::MetricsRegistry>,
1752 mcp_service: Option<&'a Arc<crate::commands::mcp::serve::McpOrchestratorService>>,
1753 clock: Arc<dyn Clock>,
1754 #[cfg_attr(not(unix), allow(dead_code))]
1755 reload_rx: &'a mut mpsc::UnboundedReceiver<AdminReloadRequest>,
1756}
1757
1758#[cfg_attr(not(unix), allow(dead_code))]
1759async fn handle_reload_request(
1760 ctx: &mut RuntimeCtx<'_>,
1761 request: AdminReloadRequest,
1762) -> Result<(), OrchestratorError> {
1763 let source = request.source.clone();
1764 match reload_manifest(ctx).await {
1765 Ok(summary) => {
1766 if let Some(mcp_service) = ctx.mcp_service {
1767 mcp_service.notify_manifest_reloaded();
1768 }
1769 write_running_state_snapshot(ctx)?;
1770 append_manifest_event(
1771 ctx.event_log,
1772 "reload_succeeded",
1773 json!({
1774 "source": source,
1775 "summary": summary,
1776 }),
1777 )
1778 .await?;
1779 eprintln!(
1780 "[harn] manifest reload ({source}) applied: +{} ~{} -{}",
1781 summary.added.len(),
1782 summary.modified.len(),
1783 summary.removed.len()
1784 );
1785 if let Some(response_tx) = request.response_tx {
1786 let _ = response_tx.send(serde_json::to_value(&summary).map_err(|error| {
1787 OrchestratorError::Serve(format!("failed to encode reload summary: {error}"))
1788 }));
1789 }
1790 }
1791 Err(error) => {
1792 eprintln!("[harn] manifest reload ({source}) failed: {error}");
1793 append_manifest_event(
1794 ctx.event_log,
1795 "reload_failed",
1796 json!({
1797 "source": source,
1798 "error": error.to_string(),
1799 }),
1800 )
1801 .await?;
1802 if let Some(response_tx) = request.response_tx {
1803 let _ = response_tx.send(Err(error));
1804 }
1805 }
1806 }
1807 Ok(())
1808}
1809
1810#[cfg_attr(not(unix), allow(dead_code))]
1811fn write_running_state_snapshot(ctx: &RuntimeCtx<'_>) -> Result<(), OrchestratorError> {
1812 let listener_metrics = ctx.listener.trigger_metrics();
1813 write_state_snapshot(
1814 &ctx.state_dir.join(STATE_SNAPSHOT_FILE),
1815 &ServeStateSnapshot {
1816 status: "running".to_string(),
1817 role: ctx.role.as_str().to_string(),
1818 bind: ctx.bind.to_string(),
1819 listener_url: ctx.listener.url(),
1820 manifest_path: ctx.config_path.display().to_string(),
1821 state_dir: ctx.state_dir.display().to_string(),
1822 started_at: ctx.startup_started_at.to_string(),
1823 stopped_at: None,
1824 secret_provider_chain: ctx.secret_chain_display.to_string(),
1825 event_log_backend: ctx.event_log_description.backend.to_string(),
1826 event_log_location: ctx
1827 .event_log_description
1828 .location
1829 .as_ref()
1830 .map(|path| path.display().to_string()),
1831 triggers: trigger_state_snapshots(ctx.live_triggers, &listener_metrics),
1832 connectors: ctx.connectors.providers.clone(),
1833 activations: ctx
1834 .connectors
1835 .activations
1836 .iter()
1837 .map(|activation| ConnectorActivationSnapshot {
1838 provider: activation.provider.as_str().to_string(),
1839 binding_count: activation.binding_count,
1840 })
1841 .collect(),
1842 },
1843 )
1844}
1845
1846#[cfg_attr(not(unix), allow(dead_code))]
1847async fn reload_manifest(
1848 ctx: &mut RuntimeCtx<'_>,
1849) -> Result<ManifestReloadSummary, OrchestratorError> {
1850 let (manifest, manifest_dir) = load_manifest(ctx.config_path)?;
1851 let mut vm = ctx
1852 .role
1853 .build_vm(&manifest_dir, &manifest_dir, ctx.state_dir)?;
1854 let extensions = package::load_runtime_extensions(ctx.config_path);
1855 let collected_triggers = package::collect_manifest_triggers(&mut vm, &extensions)
1856 .await
1857 .map_err(|error| format!("failed to collect manifest triggers: {error}"))?;
1858 let summary = summarize_manifest_reload(ctx.live_triggers, &collected_triggers);
1859 let connector_reload =
1860 connector_reload_fingerprint_map(ctx.live_triggers, &ctx.connectors.provider_overrides)
1861 != connector_reload_fingerprint_map(
1862 &collected_triggers,
1863 &extensions.provider_connectors,
1864 );
1865 let next_connector_runtime = if connector_reload {
1866 let mut runtime = initialize_connectors(
1867 &collected_triggers,
1868 ctx.event_log.clone(),
1869 ctx.secret_provider.clone(),
1870 ctx.metrics_registry.clone(),
1871 &extensions.provider_connectors,
1872 ctx.clock.clone(),
1873 )
1874 .await?;
1875 runtime.activations = runtime
1876 .registry
1877 .activate_all(&runtime.trigger_registry)
1878 .await
1879 .map_err(|error| error.to_string())?;
1880 Some(runtime)
1881 } else {
1882 None
1883 };
1884 let previous_manifest = ctx.live_manifest.clone();
1885 let previous_triggers = ctx.live_triggers.clone();
1886 package::install_collected_manifest_triggers(&collected_triggers).await?;
1887 apply_supervisor_state(ctx.state_dir).await?;
1888 let binding_versions = live_manifest_binding_versions();
1889 let route_registry = next_connector_runtime
1890 .as_ref()
1891 .map(|runtime| &runtime.registry)
1892 .unwrap_or(&ctx.connectors.registry);
1893 let route_overrides = next_connector_runtime
1894 .as_ref()
1895 .map(|runtime| runtime.provider_overrides.as_slice())
1896 .unwrap_or(ctx.connectors.provider_overrides.as_slice());
1897 let route_configs = match build_route_configs(&collected_triggers, &binding_versions)
1898 .and_then(|routes| attach_route_connectors(routes, route_registry, route_overrides))
1899 {
1900 Ok(routes) => routes,
1901 Err(error) => {
1902 rollback_manifest_reload(ctx, &previous_manifest, &previous_triggers)
1903 .await
1904 .map_err(|rollback| format!("{error}; rollback failed: {rollback}"))?;
1905 return Err(error);
1906 }
1907 };
1908 if let Err(error) = ctx.listener.reload_routes(route_configs) {
1909 rollback_manifest_reload(ctx, &previous_manifest, &previous_triggers)
1910 .await
1911 .map_err(|rollback| format!("{error}; rollback failed: {rollback}"))?;
1912 return Err(error);
1913 }
1914 if let Some(runtime) = next_connector_runtime {
1915 let previous_handles = ctx.connectors.handles.clone();
1916 let connector_clients = runtime.registry.client_map().await;
1917 harn_vm::install_active_connector_clients(connector_clients);
1918 *ctx.connectors = runtime;
1919 for handle in previous_handles {
1920 let connector = handle.lock().await;
1921 if let Err(error) = connector.shutdown(Duration::from_secs(5)).await {
1922 eprintln!(
1923 "[harn] connector {} reload shutdown warning: {error}",
1924 connector.provider_id().as_str()
1925 );
1926 }
1927 }
1928 }
1929 *ctx.live_manifest = manifest;
1930 *ctx.live_triggers = collected_triggers;
1931 Ok(summary)
1932}
1933
1934#[cfg_attr(not(unix), allow(dead_code))]
1935async fn rollback_manifest_reload(
1936 ctx: &mut RuntimeCtx<'_>,
1937 previous_manifest: &Manifest,
1938 previous_triggers: &[CollectedManifestTrigger],
1939) -> Result<(), OrchestratorError> {
1940 package::install_collected_manifest_triggers(previous_triggers).await?;
1941 apply_supervisor_state(ctx.state_dir).await?;
1942 let binding_versions = live_manifest_binding_versions();
1943 let route_configs = build_route_configs(previous_triggers, &binding_versions)?;
1944 let route_configs = attach_route_connectors(
1945 route_configs,
1946 &ctx.connectors.registry,
1947 &ctx.connectors.provider_overrides,
1948 )?;
1949 ctx.listener.reload_routes(route_configs)?;
1950 *ctx.live_manifest = previous_manifest.clone();
1951 *ctx.live_triggers = previous_triggers.to_vec();
1952 Ok(())
1953}
1954
1955#[cfg_attr(not(unix), allow(dead_code))]
1956fn summarize_manifest_reload(
1957 current: &[CollectedManifestTrigger],
1958 next: &[CollectedManifestTrigger],
1959) -> ManifestReloadSummary {
1960 let current_map = trigger_fingerprint_map(current, true);
1961 let next_map = trigger_fingerprint_map(next, true);
1962 let mut summary = ManifestReloadSummary::default();
1963 let ids: BTreeSet<String> = current_map.keys().chain(next_map.keys()).cloned().collect();
1964 for id in ids {
1965 match (current_map.get(&id), next_map.get(&id)) {
1966 (None, Some(_)) => summary.added.push(id),
1967 (Some(_), None) => summary.removed.push(id),
1968 (Some(left), Some(right)) if left == right => summary.unchanged.push(id),
1969 (Some(_), Some(_)) => summary.modified.push(id),
1970 (None, None) => {}
1971 }
1972 }
1973 summary
1974}
1975
1976#[cfg_attr(not(unix), allow(dead_code))]
1977fn trigger_fingerprint_map(
1978 triggers: &[CollectedManifestTrigger],
1979 include_http_managed: bool,
1980) -> BTreeMap<String, String> {
1981 triggers
1982 .iter()
1983 .filter(|trigger| include_http_managed || !is_http_managed_trigger(trigger))
1984 .map(|trigger| {
1985 let spec = package::manifest_trigger_binding_spec(trigger.clone());
1986 (trigger.config.id.clone(), spec.definition_fingerprint)
1987 })
1988 .collect()
1989}
1990
1991#[cfg_attr(not(unix), allow(dead_code))]
1992fn connector_reload_fingerprint_map(
1993 triggers: &[CollectedManifestTrigger],
1994 provider_overrides: &[ResolvedProviderConnectorConfig],
1995) -> BTreeMap<String, Vec<String>> {
1996 let mut by_provider = BTreeMap::<String, Vec<String>>::new();
1997 for trigger in triggers {
1998 let provider = trigger.config.provider.as_str().to_string();
1999 if !connector_owns_ingress(&provider, provider_overrides)
2000 && matches!(
2001 trigger.config.kind,
2002 crate::package::TriggerKind::Webhook | crate::package::TriggerKind::A2aPush
2003 )
2004 {
2005 continue;
2006 }
2007 let spec = package::manifest_trigger_binding_spec(trigger.clone());
2008 by_provider
2009 .entry(provider)
2010 .or_default()
2011 .push(spec.definition_fingerprint);
2012 }
2013 for override_config in provider_overrides {
2014 by_provider
2015 .entry(override_config.id.as_str().to_string())
2016 .or_default()
2017 .push(provider_connector_fingerprint(override_config));
2018 }
2019 for fingerprints in by_provider.values_mut() {
2020 fingerprints.sort();
2021 }
2022 by_provider
2023}
2024
2025#[cfg_attr(not(unix), allow(dead_code))]
2026fn provider_connector_fingerprint(config: &ResolvedProviderConnectorConfig) -> String {
2027 match &config.connector {
2028 ResolvedProviderConnectorKind::RustBuiltin => format!(
2029 "{}::builtin@{}",
2030 config.id.as_str(),
2031 config.manifest_dir.display()
2032 ),
2033 ResolvedProviderConnectorKind::Harn { module } => format!(
2034 "{}::harn:{}@{}",
2035 config.id.as_str(),
2036 module,
2037 config.manifest_dir.display()
2038 ),
2039 ResolvedProviderConnectorKind::Invalid(message) => format!(
2040 "{}::invalid:{}@{}",
2041 config.id.as_str(),
2042 message,
2043 config.manifest_dir.display()
2044 ),
2045 }
2046}
2047
2048#[cfg_attr(not(unix), allow(dead_code))]
2049fn is_http_managed_trigger(trigger: &CollectedManifestTrigger) -> bool {
2050 matches!(
2051 trigger.config.kind,
2052 crate::package::TriggerKind::Webhook | crate::package::TriggerKind::A2aPush
2053 )
2054}
2055
2056async fn initialize_connectors(
2059 triggers: &[CollectedManifestTrigger],
2060 event_log: Arc<harn_vm::event_log::AnyEventLog>,
2061 secrets: Arc<dyn harn_vm::secrets::SecretProvider>,
2062 metrics: Arc<harn_vm::MetricsRegistry>,
2063 provider_overrides: &[ResolvedProviderConnectorConfig],
2064 clock: Arc<dyn Clock>,
2065) -> Result<ConnectorRuntime, OrchestratorError> {
2066 let mut registry = harn_vm::ConnectorRegistry::default();
2067 let mut trigger_registry = harn_vm::TriggerRegistry::default();
2068 let mut grouped_kinds: BTreeMap<harn_vm::ProviderId, BTreeSet<String>> = BTreeMap::new();
2069
2070 for trigger in triggers {
2071 let binding = trigger_binding_for(&trigger.config)?;
2072 grouped_kinds
2073 .entry(binding.provider.clone())
2074 .or_default()
2075 .insert(binding.kind.as_str().to_string());
2076 trigger_registry.register(binding);
2077 }
2078
2079 let ctx = harn_vm::ConnectorCtx {
2080 inbox: Arc::new(
2081 harn_vm::InboxIndex::new(event_log.clone(), metrics.clone())
2082 .await
2083 .map_err(|error| error.to_string())?,
2084 ),
2085 event_log,
2086 secrets,
2087 metrics,
2088 rate_limiter: Arc::new(harn_vm::RateLimiterFactory::default()),
2089 };
2090
2091 let mut providers = Vec::new();
2092 let mut handles = Vec::new();
2093 for (provider, kinds) in grouped_kinds {
2094 let provider_name = provider.as_str().to_string();
2095 if let Some(connector) = connector_override_for(&provider, provider_overrides).await? {
2096 registry.remove(&provider);
2097 registry
2098 .register(connector)
2099 .map_err(|error| error.to_string())?;
2100 }
2101 if registry.get(&provider).is_none() {
2102 if provider_requires_harn_connector(provider.as_str()) {
2103 return Err(format!(
2104 "provider '{}' is package-backed; add [[providers]] id = \"{}\" with \
2105 connector = {{ harn = \"...\" }} to the manifest",
2106 provider.as_str(),
2107 provider.as_str()
2108 )
2109 .into());
2110 }
2111 let connector = connector_for(&provider, kinds, clock.clone());
2112 registry
2113 .register(connector)
2114 .map_err(|error| error.to_string())?;
2115 }
2116 let handle = registry
2117 .get(&provider)
2118 .ok_or_else(|| format!("connector registry lost provider '{}'", provider.as_str()))?;
2119 handle
2120 .lock()
2121 .await
2122 .init(ctx.clone())
2123 .await
2124 .map_err(|error| error.to_string())?;
2125 handles.push(handle.clone());
2126 providers.push(provider_name);
2127 }
2128
2129 Ok(ConnectorRuntime {
2130 registry,
2131 trigger_registry,
2132 handles,
2133 providers,
2134 activations: Vec::new(),
2135 provider_overrides: provider_overrides.to_vec(),
2136 })
2137}
2138
2139fn trigger_binding_for(
2140 config: &ResolvedTriggerConfig,
2141) -> Result<harn_vm::TriggerBinding, OrchestratorError> {
2142 Ok(harn_vm::TriggerBinding {
2143 provider: config.provider.clone(),
2144 kind: harn_vm::TriggerKind::from(trigger_kind_name(config.kind)),
2145 binding_id: config.id.clone(),
2146 dedupe_key: config.dedupe_key.clone(),
2147 dedupe_retention_days: config.retry.retention_days,
2148 config: connector_binding_config(config)?,
2149 })
2150}
2151
2152fn connector_binding_config(
2153 config: &ResolvedTriggerConfig,
2154) -> Result<JsonValue, OrchestratorError> {
2155 match config.kind {
2156 crate::package::TriggerKind::Cron => {
2157 serde_json::to_value(&config.kind_specific).map_err(|error| {
2158 OrchestratorError::Serve({
2159 format!(
2160 "failed to encode cron trigger config '{}': {error}",
2161 config.id
2162 )
2163 })
2164 })
2165 }
2166 crate::package::TriggerKind::Webhook => Ok(serde_json::json!({
2167 "match": config.match_,
2168 "secrets": config.secrets,
2169 "webhook": config.kind_specific,
2170 })),
2171 crate::package::TriggerKind::Poll => Ok(serde_json::json!({
2172 "match": config.match_,
2173 "secrets": config.secrets,
2174 "poll": config.kind_specific,
2175 })),
2176 crate::package::TriggerKind::Stream => Ok(serde_json::json!({
2177 "match": config.match_,
2178 "secrets": config.secrets,
2179 "stream": config.kind_specific,
2180 "window": config.window,
2181 })),
2182 crate::package::TriggerKind::A2aPush => Ok(serde_json::json!({
2183 "match": config.match_,
2184 "secrets": config.secrets,
2185 "a2a_push": a2a_push_connector_config(&config.kind_specific)?,
2186 })),
2187 _ => Ok(JsonValue::Null),
2188 }
2189}
2190
2191fn a2a_push_connector_config(
2192 kind_specific: &BTreeMap<String, toml::Value>,
2193) -> Result<JsonValue, OrchestratorError> {
2194 if let Some(nested) = kind_specific.get("a2a_push") {
2195 return serde_json::to_value(nested).map_err(|error| {
2196 OrchestratorError::Serve(format!("failed to encode a2a_push trigger config: {error}"))
2197 });
2198 }
2199 let filtered = kind_specific
2200 .iter()
2201 .filter(|(key, _)| key.as_str() != "path")
2202 .map(|(key, value)| (key.clone(), value.clone()))
2203 .collect::<BTreeMap<_, _>>();
2204 serde_json::to_value(filtered).map_err(|error| {
2205 OrchestratorError::Serve(format!("failed to encode a2a_push trigger config: {error}"))
2206 })
2207}
2208
2209fn connector_for(
2210 provider: &harn_vm::ProviderId,
2211 kinds: BTreeSet<String>,
2212 clock: Arc<dyn Clock>,
2213) -> Box<dyn harn_vm::Connector> {
2214 match provider.as_str() {
2215 "cron" => Box::new(harn_vm::CronConnector::with_clock(clock)),
2216 _ => Box::new(PlaceholderConnector::new(provider.clone(), kinds)),
2217 }
2218}
2219
2220async fn connector_override_for(
2221 provider: &harn_vm::ProviderId,
2222 provider_overrides: &[ResolvedProviderConnectorConfig],
2223) -> Result<Option<Box<dyn harn_vm::Connector>>, OrchestratorError> {
2224 let Some(override_config) = provider_overrides
2225 .iter()
2226 .find(|entry| entry.id == *provider)
2227 else {
2228 return Ok(None);
2229 };
2230 match &override_config.connector {
2231 ResolvedProviderConnectorKind::RustBuiltin => Ok(None),
2232 ResolvedProviderConnectorKind::Invalid(message) => {
2233 Err(OrchestratorError::Serve(message.clone()))
2234 }
2235 ResolvedProviderConnectorKind::Harn { module } => {
2236 let module_path =
2237 harn_vm::resolve_module_import_path(&override_config.manifest_dir, module);
2238 let connector = harn_vm::HarnConnector::load(&module_path)
2239 .await
2240 .map_err(|error| {
2241 format!(
2242 "failed to load Harn connector '{}' for provider '{}': {error}",
2243 module_path.display(),
2244 provider.as_str()
2245 )
2246 })?;
2247 Ok(Some(Box::new(connector)))
2248 }
2249 }
2250}
2251
2252fn build_route_configs(
2253 triggers: &[CollectedManifestTrigger],
2254 binding_versions: &BTreeMap<String, u32>,
2255) -> Result<Vec<RouteConfig>, OrchestratorError> {
2256 let mut seen_paths = BTreeSet::new();
2257 let mut routes = Vec::new();
2258 for trigger in triggers {
2259 let Some(binding_version) = binding_versions.get(&trigger.config.id).copied() else {
2260 return Err(format!(
2261 "trigger registry is missing active manifest binding '{}'",
2262 trigger.config.id
2263 )
2264 .into());
2265 };
2266 if let Some(route) = RouteConfig::from_trigger(trigger, binding_version)? {
2267 if !seen_paths.insert(route.path.clone()) {
2268 return Err(format!(
2269 "trigger route '{}' is configured more than once",
2270 route.path
2271 )
2272 .into());
2273 }
2274 routes.push(route);
2275 }
2276 }
2277 Ok(routes)
2278}
2279
2280fn attach_route_connectors(
2281 routes: Vec<RouteConfig>,
2282 registry: &harn_vm::ConnectorRegistry,
2283 provider_overrides: &[ResolvedProviderConnectorConfig],
2284) -> Result<Vec<RouteConfig>, OrchestratorError> {
2285 routes
2286 .into_iter()
2287 .map(|mut route| {
2288 if route.connector_ingress
2289 || connector_owns_ingress(route.provider.as_str(), provider_overrides)
2290 {
2291 route.connector = Some(registry.get(&route.provider).ok_or_else(|| {
2292 format!(
2293 "connector registry is missing provider '{}'",
2294 route.provider.as_str()
2295 )
2296 })?);
2297 }
2298 Ok(route)
2299 })
2300 .collect()
2301}
2302
2303fn connector_owns_ingress(
2304 provider: &str,
2305 provider_overrides: &[ResolvedProviderConnectorConfig],
2306) -> bool {
2307 provider_overrides.iter().any(|entry| {
2308 entry.id.as_str() == provider
2309 && matches!(entry.connector, ResolvedProviderConnectorKind::Harn { .. })
2310 })
2311}
2312
2313fn provider_requires_harn_connector(provider: &str) -> bool {
2314 harn_vm::provider_metadata(provider).is_some_and(|metadata| {
2315 matches!(
2316 metadata.runtime,
2317 harn_vm::ProviderRuntimeMetadata::Placeholder
2318 )
2319 })
2320}
2321
2322fn live_manifest_binding_versions() -> BTreeMap<String, u32> {
2323 let mut versions = BTreeMap::new();
2324 for binding in harn_vm::snapshot_trigger_bindings() {
2325 if binding.source != harn_vm::TriggerBindingSource::Manifest {
2326 continue;
2327 }
2328 if binding.state == harn_vm::TriggerState::Terminated {
2329 continue;
2330 }
2331 versions
2332 .entry(binding.id)
2333 .and_modify(|current: &mut u32| *current = (*current).max(binding.version))
2334 .or_insert(binding.version);
2335 }
2336 versions
2337}
2338
2339fn trigger_state_snapshots(
2340 triggers: &[CollectedManifestTrigger],
2341 listener_metrics: &BTreeMap<String, TriggerMetricSnapshot>,
2342) -> Vec<TriggerStateSnapshot> {
2343 let bindings_by_id = harn_vm::snapshot_trigger_bindings()
2344 .into_iter()
2345 .filter(|binding| binding.source == harn_vm::TriggerBindingSource::Manifest)
2346 .fold(
2347 BTreeMap::<String, harn_vm::TriggerBindingSnapshot>::new(),
2348 |mut acc, binding| {
2349 match acc.get(&binding.id) {
2350 Some(current) if current.version >= binding.version => {}
2351 _ => {
2352 acc.insert(binding.id.clone(), binding);
2353 }
2354 }
2355 acc
2356 },
2357 );
2358
2359 triggers
2360 .iter()
2361 .map(|trigger| {
2362 let runtime = bindings_by_id.get(&trigger.config.id);
2363 let metrics = listener_metrics.get(&trigger.config.id);
2364 TriggerStateSnapshot {
2365 id: trigger.config.id.clone(),
2366 provider: trigger.config.provider.as_str().to_string(),
2367 kind: trigger_kind_name(trigger.config.kind).to_string(),
2368 handler: handler_kind(&trigger.handler).to_string(),
2369 version: runtime.map(|binding| binding.version),
2370 state: runtime.map(|binding| binding.state.as_str().to_string()),
2371 received: metrics.map(|value| value.received).unwrap_or(0),
2372 dispatched: metrics.map(|value| value.dispatched).unwrap_or(0),
2373 failed: metrics.map(|value| value.failed).unwrap_or(0),
2374 in_flight: metrics.map(|value| value.in_flight).unwrap_or(0),
2375 }
2376 })
2377 .collect()
2378}
2379
2380fn format_trigger_summary(triggers: &[CollectedManifestTrigger]) -> String {
2383 if triggers.is_empty() {
2384 return "none".to_string();
2385 }
2386 triggers
2387 .iter()
2388 .map(|trigger| {
2389 format!(
2390 "{} [{}:{} -> {}]",
2391 trigger.config.id,
2392 trigger.config.provider.as_str(),
2393 trigger_kind_name(trigger.config.kind),
2394 handler_kind(&trigger.handler)
2395 )
2396 })
2397 .collect::<Vec<_>>()
2398 .join(", ")
2399}
2400
2401fn format_activation_summary(activations: &[harn_vm::ActivationHandle]) -> String {
2402 if activations.is_empty() {
2403 return "none".to_string();
2404 }
2405 activations
2406 .iter()
2407 .map(|activation| {
2408 format!(
2409 "{}({})",
2410 activation.provider.as_str(),
2411 activation.binding_count
2412 )
2413 })
2414 .collect::<Vec<_>>()
2415 .join(", ")
2416}
2417
2418fn handler_kind(handler: &CollectedTriggerHandler) -> &'static str {
2419 match handler {
2420 CollectedTriggerHandler::Local { .. } => "local",
2421 CollectedTriggerHandler::A2a { .. } => "a2a",
2422 CollectedTriggerHandler::Worker { .. } => "worker",
2423 CollectedTriggerHandler::Persona { .. } => "persona",
2424 }
2425}
2426
2427fn trigger_kind_name(kind: crate::package::TriggerKind) -> &'static str {
2428 match kind {
2429 crate::package::TriggerKind::Webhook => "webhook",
2430 crate::package::TriggerKind::Cron => "cron",
2431 crate::package::TriggerKind::Poll => "poll",
2432 crate::package::TriggerKind::Stream => "stream",
2433 crate::package::TriggerKind::Predicate => "predicate",
2434 crate::package::TriggerKind::A2aPush => "a2a-push",
2435 }
2436}
2437
2438fn has_orchestrator_api_keys_configured() -> bool {
2439 std::env::var("HARN_ORCHESTRATOR_API_KEYS")
2440 .ok()
2441 .is_some_and(|value| value.split(',').any(|segment| !segment.trim().is_empty()))
2442}
2443
2444fn has_mcp_oauth_configured() -> bool {
2445 std::env::var("HARN_MCP_OAUTH_AUTHORIZATION_SERVERS")
2446 .ok()
2447 .is_some_and(|value| value.split(',').any(|segment| !segment.trim().is_empty()))
2448}
2449
2450fn validate_mcp_paths(
2451 path: &str,
2452 sse_path: &str,
2453 messages_path: &str,
2454) -> Result<(), OrchestratorError> {
2455 let reserved = [
2456 "/health",
2457 "/healthz",
2458 "/readyz",
2459 "/metrics",
2460 "/admin/reload",
2461 "/acp",
2462 ];
2463 let mut seen = BTreeSet::new();
2464 for (label, value) in [
2465 ("--mcp-path", path),
2466 ("--mcp-sse-path", sse_path),
2467 ("--mcp-messages-path", messages_path),
2468 ] {
2469 if !value.starts_with('/') {
2470 return Err(format!("{label} must start with '/'").into());
2471 }
2472 if value == "/" {
2473 return Err(format!("{label} cannot be '/'").into());
2474 }
2475 if reserved.contains(&value) {
2476 return Err(format!("{label} cannot use reserved listener path '{value}'").into());
2477 }
2478 if !seen.insert(value) {
2479 return Err(format!("embedded MCP paths must be unique; duplicate '{value}'").into());
2480 }
2481 }
2482 Ok(())
2483}
2484
2485async fn append_lifecycle_event(
2486 log: &Arc<AnyEventLog>,
2487 kind: &str,
2488 payload: JsonValue,
2489) -> Result<(), OrchestratorError> {
2490 let topic =
2491 harn_vm::event_log::Topic::new(LIFECYCLE_TOPIC).map_err(|error| error.to_string())?;
2492 log.append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
2493 .await
2494 .map(|_| ())
2495 .map_err(|error| {
2496 OrchestratorError::Serve(format!(
2497 "failed to append orchestrator lifecycle event: {error}"
2498 ))
2499 })
2500}
2501
2502async fn append_pump_lifecycle_event(
2503 log: &Arc<AnyEventLog>,
2504 kind: &str,
2505 payload: JsonValue,
2506) -> Result<(), OrchestratorError> {
2507 append_lifecycle_event(log, kind, payload).await
2508}
2509
2510async fn wait_for_pump_drain_release(
2511 log: &Arc<AnyEventLog>,
2512 topic: &harn_vm::event_log::Topic,
2513 event_id: u64,
2514 gate_rx: &mut watch::Receiver<bool>,
2515) -> Result<(), OrchestratorError> {
2516 if !*gate_rx.borrow() {
2517 return Ok(());
2518 }
2519 append_pump_lifecycle_event(
2520 log,
2521 "pump_drain_waiting",
2522 json!({
2523 "topic": topic.as_str(),
2524 "event_log_id": event_id,
2525 }),
2526 )
2527 .await?;
2528 while *gate_rx.borrow() {
2529 if gate_rx.changed().await.is_err() {
2530 break;
2531 }
2532 }
2533 Ok(())
2534}
2535
2536#[cfg_attr(not(unix), allow(dead_code))]
2537async fn append_manifest_event(
2538 log: &Arc<AnyEventLog>,
2539 kind: &str,
2540 payload: JsonValue,
2541) -> Result<(), OrchestratorError> {
2542 let topic =
2543 harn_vm::event_log::Topic::new(MANIFEST_TOPIC).map_err(|error| error.to_string())?;
2544 log.append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
2545 .await
2546 .map(|_| ())
2547 .map_err(|error| {
2548 OrchestratorError::Serve(format!(
2549 "failed to append orchestrator manifest event: {error}"
2550 ))
2551 })
2552}
2553
2554async fn record_pump_metrics(
2555 metrics: &harn_vm::MetricsRegistry,
2556 log: &Arc<AnyEventLog>,
2557 topic: &harn_vm::event_log::Topic,
2558 last_seen: u64,
2559 outstanding: usize,
2560) -> Result<(), OrchestratorError> {
2561 let latest = log.latest(topic).await.ok().flatten().unwrap_or(last_seen);
2562 let backlog = latest.saturating_sub(last_seen);
2563 metrics.set_orchestrator_pump_outstanding(topic.as_str(), outstanding);
2564 metrics.set_orchestrator_pump_backlog(topic.as_str(), backlog);
2565 append_pump_lifecycle_event(
2566 log,
2567 "pump_metrics_recorded",
2568 json!({
2569 "topic": topic.as_str(),
2570 "latest": latest,
2571 "last_seen": last_seen,
2572 "backlog": backlog,
2573 "outstanding": outstanding,
2574 }),
2575 )
2576 .await
2577}
2578
2579fn admission_delay(occurred_at_ms: i64) -> Duration {
2580 let now = std::time::SystemTime::now()
2581 .duration_since(std::time::UNIX_EPOCH)
2582 .unwrap_or_default()
2583 .as_millis() as i64;
2584 Duration::from_millis(now.saturating_sub(occurred_at_ms).max(0) as u64)
2585}
2586
2587async fn emit_drain_truncated(
2588 log: &Arc<AnyEventLog>,
2589 topic_name: &str,
2590 report: PumpDrainReport,
2591 config: DrainConfig,
2592) -> Result<(), OrchestratorError> {
2593 if !report.truncated() {
2594 return Ok(());
2595 }
2596 eprintln!(
2597 "[harn] warning: pump drain truncated for {topic_name}: remaining_queued={} drain_items={} reason={}",
2598 report.remaining_queued,
2599 report.drain_items,
2600 report.stop_reason.as_str()
2601 );
2602 append_lifecycle_event(
2603 log,
2604 "drain_truncated",
2605 json!({
2606 "topic": topic_name,
2607 "remaining_queued": report.remaining_queued,
2608 "drain_items": report.drain_items,
2609 "outstanding_tasks": report.outstanding_tasks,
2610 "max_items": config.max_items,
2611 "deadline_secs": config.deadline.as_secs(),
2612 "reason": report.stop_reason.as_str(),
2613 }),
2614 )
2615 .await
2616}
2617
2618async fn topic_latest_id(
2619 log: &Arc<AnyEventLog>,
2620 topic_name: &str,
2621) -> Result<u64, OrchestratorError> {
2622 let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
2623 log.latest(&topic)
2624 .await
2625 .map(|value| value.unwrap_or(0))
2626 .map_err(|error| {
2627 OrchestratorError::Serve(format!(
2628 "failed to read topic head for {topic_name}: {error}"
2629 ))
2630 })
2631}
2632
2633async fn drain_pump_best_effort(
2634 log: &Arc<AnyEventLog>,
2635 topic_name: &str,
2636 pump: PumpHandle,
2637 config: DrainConfig,
2638 overall_deadline: tokio::time::Instant,
2639) -> Result<PumpDrainReport, OrchestratorError> {
2640 let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
2641 let consumer = pump_consumer_id(&topic)?;
2642 let start_seen = log
2643 .consumer_cursor(&topic, &consumer)
2644 .await
2645 .map_err(|error| format!("failed to read consumer cursor for {topic_name}: {error}"))?
2646 .unwrap_or(0);
2647 let up_to = log
2648 .latest(&topic)
2649 .await
2650 .map_err(|error| format!("failed to read topic head for {topic_name}: {error}"))?
2651 .unwrap_or(0);
2652 let budget = remaining_budget(overall_deadline);
2653
2654 match tokio::time::timeout(
2655 budget,
2656 pump.drain(log, topic_name, up_to, config, overall_deadline),
2657 )
2658 .await
2659 {
2660 Ok(Ok(report)) => Ok(report),
2661 Ok(Err(error)) => {
2662 eprintln!("[harn] warning: pump drain error for {topic_name}: {error}");
2663 best_effort_pump_report(
2664 log,
2665 &topic,
2666 &consumer,
2667 start_seen,
2668 up_to,
2669 PumpDrainStopReason::Error,
2670 )
2671 .await
2672 }
2673 Err(_) => {
2674 eprintln!(
2675 "[harn] warning: pump drain timed out for {topic_name} after {:?}",
2676 budget
2677 );
2678 best_effort_pump_report(
2679 log,
2680 &topic,
2681 &consumer,
2682 start_seen,
2683 up_to,
2684 PumpDrainStopReason::Deadline,
2685 )
2686 .await
2687 }
2688 }
2689}
2690
2691async fn best_effort_pump_report(
2692 log: &Arc<AnyEventLog>,
2693 topic: &harn_vm::event_log::Topic,
2694 consumer: &ConsumerId,
2695 start_seen: u64,
2696 up_to: u64,
2697 stop_reason: PumpDrainStopReason,
2698) -> Result<PumpDrainReport, OrchestratorError> {
2699 let last_seen = log
2700 .consumer_cursor(topic, consumer)
2701 .await
2702 .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
2703 .unwrap_or(start_seen);
2704 let stats = PumpStats {
2705 last_seen,
2706 processed: last_seen.saturating_sub(start_seen),
2707 };
2708 Ok(pump_drain_report(stats, start_seen, up_to, 0, stop_reason))
2709}
2710
2711fn remaining_budget(deadline: tokio::time::Instant) -> Duration {
2712 deadline.saturating_duration_since(tokio::time::Instant::now())
2713}
2714
2715fn maybe_finish_pump_drain(
2716 stats: PumpStats,
2717 progress: PumpDrainProgress,
2718 outstanding_tasks: usize,
2719) -> Option<PumpDrainReport> {
2720 if stats.last_seen >= progress.request.up_to && outstanding_tasks == 0 {
2721 return Some(pump_drain_report(
2722 stats,
2723 progress.start_seen,
2724 progress.request.up_to,
2725 outstanding_tasks,
2726 PumpDrainStopReason::Drained,
2727 ));
2728 }
2729 if outstanding_tasks > 0 {
2730 if tokio::time::Instant::now() >= progress.request.deadline {
2731 return Some(pump_drain_report(
2732 stats,
2733 progress.start_seen,
2734 progress.request.up_to,
2735 outstanding_tasks,
2736 PumpDrainStopReason::Deadline,
2737 ));
2738 }
2739 return None;
2740 }
2741 let drain_items = stats.last_seen.saturating_sub(progress.start_seen);
2742 if drain_items >= progress.request.config.max_items as u64 {
2743 return Some(pump_drain_report(
2744 stats,
2745 progress.start_seen,
2746 progress.request.up_to,
2747 outstanding_tasks,
2748 PumpDrainStopReason::MaxItems,
2749 ));
2750 }
2751 if tokio::time::Instant::now() >= progress.request.deadline {
2752 return Some(pump_drain_report(
2753 stats,
2754 progress.start_seen,
2755 progress.request.up_to,
2756 outstanding_tasks,
2757 PumpDrainStopReason::Deadline,
2758 ));
2759 }
2760 None
2761}
2762
2763fn pump_drain_report(
2764 stats: PumpStats,
2765 start_seen: u64,
2766 up_to: u64,
2767 outstanding_tasks: usize,
2768 stop_reason: PumpDrainStopReason,
2769) -> PumpDrainReport {
2770 PumpDrainReport {
2771 stats,
2772 drain_items: stats.last_seen.saturating_sub(start_seen),
2773 remaining_queued: up_to.saturating_sub(stats.last_seen),
2774 outstanding_tasks,
2775 stop_reason,
2776 }
2777}
2778
2779fn pump_consumer_id(topic: &harn_vm::event_log::Topic) -> Result<ConsumerId, OrchestratorError> {
2780 ConsumerId::new(format!("orchestrator-pump.{}", topic.as_str())).map_err(|error| {
2781 OrchestratorError::Serve(format!("failed to create consumer id for {topic}: {error}"))
2782 })
2783}
2784
2785fn inbox_task_test_release_file() -> Option<PathBuf> {
2786 test_file_from_env(TEST_INBOX_TASK_RELEASE_FILE_ENV)
2787}
2788
2789fn test_file_from_env(key: &str) -> Option<PathBuf> {
2790 std::env::var_os(key)
2791 .filter(|value| !value.is_empty())
2792 .map(PathBuf::from)
2793}
2794
2795async fn wait_for_test_release_file(path: &Path) {
2796 while tokio::fs::metadata(path).await.is_err() {
2797 tokio::time::sleep(Duration::from_millis(10)).await;
2798 }
2799}
2800
2801fn pending_pump_test_should_fail() -> bool {
2802 std::env::var(TEST_FAIL_PENDING_PUMP_ENV)
2803 .ok()
2804 .is_some_and(|value| value != "0")
2805}
2806
2807fn spawn_manifest_watcher(
2808 config_path: PathBuf,
2809 reload: AdminReloadHandle,
2810) -> Result<notify::RecommendedWatcher, OrchestratorError> {
2811 use notify::{Event, EventKind, RecursiveMode, Watcher};
2812
2813 let watch_dir = config_path.parent().ok_or_else(|| {
2814 format!(
2815 "manifest has no parent directory: {}",
2816 config_path.display()
2817 )
2818 })?;
2819 let target_name = config_path
2820 .file_name()
2821 .and_then(|name| name.to_str())
2822 .ok_or_else(|| {
2823 format!(
2824 "manifest path is not valid UTF-8: {}",
2825 config_path.display()
2826 )
2827 })?
2828 .to_string();
2829 let (tx, mut rx) = mpsc::unbounded_channel::<()>();
2830 tokio::task::spawn_local(async move {
2831 while rx.recv().await.is_some() {
2832 tokio::time::sleep(Duration::from_millis(200)).await;
2833 while rx.try_recv().is_ok() {}
2834 let _ = reload.trigger("file_watch");
2835 }
2836 });
2837 let mut watcher =
2838 notify::recommended_watcher(move |res: Result<Event, notify::Error>| match res {
2839 Ok(event)
2840 if matches!(
2841 event.kind,
2842 EventKind::Modify(_)
2843 | EventKind::Create(_)
2844 | EventKind::Remove(_)
2845 | EventKind::Any
2846 ) && event.paths.iter().any(|path| {
2847 path.file_name()
2848 .and_then(|name| name.to_str())
2849 .is_some_and(|name| name == target_name)
2850 }) =>
2851 {
2852 let _ = tx.send(());
2853 }
2854 _ => {}
2855 })
2856 .map_err(|error| format!("failed to create manifest watcher: {error}"))?;
2857 watcher
2858 .watch(watch_dir, RecursiveMode::NonRecursive)
2859 .map_err(|error| {
2860 format!(
2861 "failed to watch manifest directory {}: {error}",
2862 watch_dir.display()
2863 )
2864 })?;
2865 Ok(watcher)
2866}
2867
2868pub(crate) fn load_manifest(config_path: &Path) -> Result<(Manifest, PathBuf), OrchestratorError> {
2869 if !config_path.is_file() {
2870 return Err(format!("manifest not found: {}", config_path.display()).into());
2871 }
2872 let content = std::fs::read_to_string(config_path)
2873 .map_err(|error| format!("failed to read {}: {error}", config_path.display()))?;
2874 let manifest = toml::from_str::<Manifest>(&content)
2875 .map_err(|error| format!("failed to parse {}: {error}", config_path.display()))?;
2876 let manifest_dir = config_path.parent().map(Path::to_path_buf).ok_or_else(|| {
2877 format!(
2878 "manifest has no parent directory: {}",
2879 config_path.display()
2880 )
2881 })?;
2882 Ok((manifest, manifest_dir))
2883}
2884
2885pub(crate) fn absolutize_from_cwd(path: &Path) -> Result<PathBuf, OrchestratorError> {
2886 let candidate = if path.is_absolute() {
2887 path.to_path_buf()
2888 } else {
2889 std::env::current_dir()
2890 .map_err(|error| format!("failed to read current directory: {error}"))?
2891 .join(path)
2892 };
2893 Ok(candidate)
2894}
2895
2896fn configured_secret_chain_display() -> String {
2897 std::env::var(harn_vm::secrets::SECRET_PROVIDER_CHAIN_ENV)
2898 .unwrap_or_else(|_| harn_vm::secrets::DEFAULT_SECRET_PROVIDER_CHAIN.to_string())
2899 .split(',')
2900 .map(str::trim)
2901 .filter(|segment| !segment.is_empty())
2902 .collect::<Vec<_>>()
2903 .join(" -> ")
2904}
2905
2906fn secret_namespace_for(manifest_dir: &Path) -> String {
2907 match std::env::var("HARN_SECRET_NAMESPACE") {
2908 Ok(namespace) if !namespace.trim().is_empty() => namespace,
2909 _ => {
2910 let leaf = manifest_dir
2911 .file_name()
2912 .and_then(|name| name.to_str())
2913 .filter(|name| !name.is_empty())
2914 .unwrap_or("workspace");
2915 format!("harn/{leaf}")
2916 }
2917 }
2918}
2919
2920fn now_rfc3339() -> Result<String, OrchestratorError> {
2921 OffsetDateTime::now_utc()
2922 .format(&Rfc3339)
2923 .map_err(|error| OrchestratorError::Serve(format!("failed to format timestamp: {error}")))
2924}
2925
2926fn write_state_snapshot(
2927 path: &Path,
2928 snapshot: &ServeStateSnapshot,
2929) -> Result<(), OrchestratorError> {
2930 let encoded = serde_json::to_vec_pretty(snapshot)
2931 .map_err(|error| format!("failed to encode orchestrator state snapshot: {error}"))?;
2932 if let Some(parent) = path.parent() {
2933 std::fs::create_dir_all(parent)
2934 .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
2935 }
2936 std::fs::write(path, encoded).map_err(|error| {
2937 OrchestratorError::Serve(format!("failed to write {}: {error}", path.display()))
2938 })
2939}
2940
2941#[derive(Debug, Serialize)]
2944struct ServeStateSnapshot {
2945 status: String,
2946 role: String,
2947 bind: String,
2948 listener_url: String,
2949 manifest_path: String,
2950 state_dir: String,
2951 started_at: String,
2952 stopped_at: Option<String>,
2953 secret_provider_chain: String,
2954 event_log_backend: String,
2955 event_log_location: Option<String>,
2956 triggers: Vec<TriggerStateSnapshot>,
2957 connectors: Vec<String>,
2958 activations: Vec<ConnectorActivationSnapshot>,
2959}
2960
2961#[derive(Debug, Serialize)]
2962struct TriggerStateSnapshot {
2963 id: String,
2964 provider: String,
2965 kind: String,
2966 handler: String,
2967 version: Option<u32>,
2968 state: Option<String>,
2969 received: u64,
2970 dispatched: u64,
2971 failed: u64,
2972 in_flight: u64,
2973}
2974
2975#[derive(Debug, Serialize)]
2976struct ConnectorActivationSnapshot {
2977 provider: String,
2978 binding_count: usize,
2979}
2980
2981struct PlaceholderConnector {
2984 provider_id: harn_vm::ProviderId,
2985 kinds: Vec<harn_vm::TriggerKind>,
2986 _ctx: Option<harn_vm::ConnectorCtx>,
2987}
2988
2989impl PlaceholderConnector {
2990 fn new(provider_id: harn_vm::ProviderId, kinds: BTreeSet<String>) -> Self {
2991 Self {
2992 provider_id,
2993 kinds: kinds.into_iter().map(harn_vm::TriggerKind::from).collect(),
2994 _ctx: None,
2995 }
2996 }
2997}
2998
2999struct PlaceholderClient;
3000
3001#[async_trait]
3002impl harn_vm::ConnectorClient for PlaceholderClient {
3003 async fn call(
3004 &self,
3005 method: &str,
3006 _args: JsonValue,
3007 ) -> Result<JsonValue, harn_vm::ClientError> {
3008 Err(harn_vm::ClientError::Other(format!(
3009 "connector client method '{method}' is not implemented in the orchestrator scaffold"
3010 )))
3011 }
3012}
3013
3014#[async_trait]
3015impl harn_vm::Connector for PlaceholderConnector {
3016 fn provider_id(&self) -> &harn_vm::ProviderId {
3017 &self.provider_id
3018 }
3019
3020 fn kinds(&self) -> &[harn_vm::TriggerKind] {
3021 &self.kinds
3022 }
3023
3024 async fn init(&mut self, ctx: harn_vm::ConnectorCtx) -> Result<(), harn_vm::ConnectorError> {
3025 self._ctx = Some(ctx);
3026 Ok(())
3027 }
3028
3029 async fn activate(
3030 &self,
3031 bindings: &[harn_vm::TriggerBinding],
3032 ) -> Result<harn_vm::ActivationHandle, harn_vm::ConnectorError> {
3033 Ok(harn_vm::ActivationHandle::new(
3034 self.provider_id.clone(),
3035 bindings.len(),
3036 ))
3037 }
3038
3039 async fn normalize_inbound(
3040 &self,
3041 _raw: harn_vm::RawInbound,
3042 ) -> Result<harn_vm::TriggerEvent, harn_vm::ConnectorError> {
3043 Err(harn_vm::ConnectorError::Unsupported(format!(
3044 "connector '{}' inbound normalization is not implemented yet",
3045 self.provider_id.as_str()
3046 )))
3047 }
3048
3049 fn payload_schema(&self) -> harn_vm::ProviderPayloadSchema {
3050 harn_vm::ProviderPayloadSchema::named("TriggerEvent")
3051 }
3052
3053 fn client(&self) -> Arc<dyn harn_vm::ConnectorClient> {
3054 Arc::new(PlaceholderClient)
3055 }
3056}
3057
3058#[cfg(test)]
3059mod tests {
3060 use super::*;
3061 use futures::StreamExt;
3062 use harn_vm::event_log::{EventLog, Topic};
3063
3064 fn write_test_file(dir: &Path, relative: &str, contents: &str) {
3065 let path = dir.join(relative);
3066 if let Some(parent) = path.parent() {
3067 std::fs::create_dir_all(parent).unwrap();
3068 }
3069 std::fs::write(path, contents).unwrap();
3070 }
3071
3072 fn stream_manifest_fixture() -> &'static str {
3073 r#"
3074[package]
3075name = "fixture"
3076
3077[exports]
3078handlers = "lib.harn"
3079
3080[[triggers]]
3081id = "ws-stream"
3082kind = "stream"
3083provider = "websocket"
3084path = "/streams/ws"
3085match = { events = ["quote.tick"] }
3086handler = "handlers::on_stream"
3087"#
3088 }
3089
3090 fn stream_handler_fixture(marker_path: &Path) -> String {
3091 format!(
3092 r#"
3093import "std/triggers"
3094
3095pub fn on_stream(event: TriggerEvent) {{
3096 write_file({marker:?}, json_stringify({{
3097 provider: event.provider,
3098 kind: event.kind,
3099 key: event.provider_payload.key,
3100 stream: event.provider_payload.stream,
3101 amount: event.provider_payload.raw.value.amount,
3102 }}))
3103}}
3104"#,
3105 marker = marker_path.display().to_string()
3106 )
3107 }
3108
3109 #[tokio::test(flavor = "multi_thread")]
3113 async fn stream_trigger_route_uses_generic_stream_connector_in_process() {
3114 let _env_lock = crate::tests::common::env_lock::lock_env().lock().await;
3117 let _secret_providers = crate::env_guard::ScopedEnvVar::set("HARN_SECRET_PROVIDERS", "env");
3118
3119 let temp = tempfile::TempDir::new().unwrap();
3120 let marker_path = temp.path().join("stream-handler.json");
3121 write_test_file(temp.path(), "harn.toml", stream_manifest_fixture());
3122 write_test_file(
3123 temp.path(),
3124 "lib.harn",
3125 &stream_handler_fixture(&marker_path),
3126 );
3127
3128 let config =
3129 OrchestratorConfig::for_test(temp.path().join("harn.toml"), temp.path().join("state"));
3130 let harness = OrchestratorHarness::start(config)
3131 .await
3132 .expect("harness start");
3133 let base_url = harness.listener_url().to_string();
3134 let event_log = harness.event_log();
3135
3136 let response = reqwest::Client::new()
3137 .post(format!("{base_url}/streams/ws"))
3138 .header("content-type", "application/json")
3139 .json(&serde_json::json!({
3140 "key": "acct-1",
3141 "stream": "quotes",
3142 "value": {"amount": 10}
3143 }))
3144 .send()
3145 .await
3146 .unwrap();
3147 assert_eq!(response.status(), 200);
3148
3149 let topic = Topic::new("orchestrator.lifecycle").unwrap();
3152 let mut stream = event_log.clone().subscribe(&topic, None).await.unwrap();
3153 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30);
3154 loop {
3155 let remaining = deadline
3156 .checked_duration_since(tokio::time::Instant::now())
3157 .expect("timed out waiting for pump_dispatch_completed");
3158 let (_, event) = tokio::time::timeout(remaining, stream.next())
3159 .await
3160 .expect("timed out waiting for pump_dispatch_completed event")
3161 .expect("event stream ended unexpectedly")
3162 .expect("event stream error");
3163 if event.kind == "pump_dispatch_completed"
3164 && event.payload["status"] == serde_json::json!("completed")
3165 {
3166 break;
3167 }
3168 }
3169 drop(stream);
3170
3171 let marker: serde_json::Value =
3172 serde_json::from_str(&std::fs::read_to_string(&marker_path).unwrap()).unwrap();
3173 assert_eq!(
3174 marker.get("provider").and_then(|v| v.as_str()),
3175 Some("websocket")
3176 );
3177 assert_eq!(
3178 marker.get("kind").and_then(|v| v.as_str()),
3179 Some("quote.tick")
3180 );
3181 assert_eq!(marker.get("key").and_then(|v| v.as_str()), Some("acct-1"));
3182 assert_eq!(
3183 marker.get("stream").and_then(|v| v.as_str()),
3184 Some("quotes")
3185 );
3186 assert_eq!(marker.get("amount").and_then(|v| v.as_i64()), Some(10));
3187
3188 harness
3189 .shutdown(std::time::Duration::from_secs(5))
3190 .await
3191 .expect("harness shutdown");
3192 }
3193}