1use std::collections::{BTreeMap, BTreeSet};
2use std::net::SocketAddr;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use serde::{Deserialize, Serialize};
10use serde_json::{json, Value as JsonValue};
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use tokio::sync::{mpsc, oneshot, watch};
14use tokio::task::JoinSet;
15
16use harn_vm::event_log::{AnyEventLog, ConsumerId, EventLog};
17
18use super::common::stranded_envelopes;
19use super::errors::OrchestratorError;
20use super::listener::{
21 AdminReloadHandle, AdminReloadRequest, ListenerConfig, ListenerRuntime, RouteConfig,
22 TriggerMetricSnapshot,
23};
24use super::origin_guard::OriginAllowList;
25use super::role::OrchestratorRole;
26use super::supervisor_state::apply_supervisor_state;
27use super::tls::TlsFiles;
28use crate::package::{
29 self, CollectedManifestTrigger, CollectedTriggerHandler, Manifest,
30 ResolvedProviderConnectorConfig, ResolvedProviderConnectorKind, ResolvedTriggerConfig,
31};
32
33const LIFECYCLE_TOPIC: &str = "orchestrator.lifecycle";
34#[cfg_attr(not(unix), allow(dead_code))]
35const MANIFEST_TOPIC: &str = "orchestrator.manifest";
36const STATE_SNAPSHOT_FILE: &str = "orchestrator-state.json";
37const PENDING_TOPIC: &str = "orchestrator.triggers.pending";
38const CRON_TICK_TOPIC: &str = "connectors.cron.tick";
39const TEST_INBOX_TASK_RELEASE_FILE_ENV: &str = "HARN_TEST_ORCHESTRATOR_INBOX_TASK_RELEASE_FILE";
40const TEST_FAIL_PENDING_PUMP_ENV: &str = "HARN_TEST_ORCHESTRATOR_FAIL_PENDING_PUMP";
41const WAITPOINT_SERVICE_INTERVAL: Duration = Duration::from_millis(250);
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq)]
47pub struct DrainConfig {
48 pub max_items: usize,
49 pub deadline: Duration,
50}
51
52impl Default for DrainConfig {
53 fn default() -> Self {
54 Self {
55 max_items: crate::package::default_orchestrator_drain_max_items(),
56 deadline: Duration::from_secs(
57 crate::package::default_orchestrator_drain_deadline_seconds(),
58 ),
59 }
60 }
61}
62
63#[derive(Clone, Copy, Debug, PartialEq, Eq)]
65pub struct PumpConfig {
66 pub max_outstanding: usize,
67}
68
69impl Default for PumpConfig {
70 fn default() -> Self {
71 Self {
72 max_outstanding: crate::package::default_orchestrator_pump_max_outstanding(),
73 }
74 }
75}
76
77#[derive(Clone, Debug)]
79pub struct OrchestratorConfig {
80 pub manifest_path: PathBuf,
81 pub state_dir: PathBuf,
82 pub bind: SocketAddr,
83 pub role: OrchestratorRole,
84 pub watch_manifest: bool,
85 pub mcp: bool,
86 pub mcp_path: String,
87 pub mcp_sse_path: String,
88 pub mcp_messages_path: String,
89 pub tls: Option<TlsFiles>,
90 pub shutdown_timeout: Duration,
91 pub drain: DrainConfig,
92 pub pump: PumpConfig,
93 pub log_format: Option<harn_vm::observability::otel::LogFormat>,
96}
97
98impl OrchestratorConfig {
99 pub fn for_test(manifest_path: PathBuf, state_dir: PathBuf) -> Self {
101 Self {
102 manifest_path,
103 state_dir,
104 bind: "127.0.0.1:0".parse().unwrap(),
105 role: OrchestratorRole::SingleTenant,
106 watch_manifest: false,
107 mcp: false,
108 mcp_path: "/mcp".to_string(),
109 mcp_sse_path: "/sse".to_string(),
110 mcp_messages_path: "/messages".to_string(),
111 tls: None,
112 shutdown_timeout: Duration::from_secs(5),
113 drain: DrainConfig::default(),
114 pump: PumpConfig::default(),
115 log_format: None,
116 }
117 }
118}
119
120#[derive(Debug)]
124pub struct ShutdownReport {
125 #[allow(dead_code)]
126 pub timed_out: bool,
127}
128
129#[derive(Debug)]
131pub struct HarnessError(pub String);
132
133impl std::fmt::Display for HarnessError {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 f.write_str(&self.0)
136 }
137}
138
139impl From<OrchestratorError> for HarnessError {
140 fn from(error: OrchestratorError) -> Self {
141 HarnessError(error.to_string())
142 }
143}
144
145impl From<String> for HarnessError {
146 fn from(s: String) -> Self {
147 HarnessError(s)
148 }
149}
150
151#[allow(dead_code)]
158pub struct OrchestratorHarness {
159 event_log: Arc<AnyEventLog>,
160 listener_url: String,
161 local_addr: SocketAddr,
162 state_dir: PathBuf,
163 admin_reload: AdminReloadHandle,
164 shutdown_tx: Arc<watch::Sender<bool>>,
165 pump_drain_gate: PumpDrainGate,
166 join: Option<std::thread::JoinHandle<()>>,
167}
168
169struct ReadyState {
170 event_log: Arc<AnyEventLog>,
171 listener_url: String,
172 local_addr: SocketAddr,
173 state_dir: PathBuf,
174 admin_reload: AdminReloadHandle,
175}
176
177#[derive(Clone)]
178struct PumpDrainGate {
179 hold_tx: watch::Sender<bool>,
180}
181
182impl PumpDrainGate {
183 fn new() -> Self {
184 let (hold_tx, _) = watch::channel(false);
185 Self { hold_tx }
186 }
187
188 fn pause(&self) {
189 let _ = self.hold_tx.send(true);
190 }
191
192 fn release(&self) {
193 let _ = self.hold_tx.send(false);
194 }
195
196 fn subscribe(&self) -> watch::Receiver<bool> {
197 self.hold_tx.subscribe()
198 }
199}
200
201#[allow(dead_code)]
202impl OrchestratorHarness {
203 pub async fn start(config: OrchestratorConfig) -> Result<Self, HarnessError> {
206 let (ready_tx, ready_rx) = oneshot::channel::<Result<ReadyState, OrchestratorError>>();
207 let (shutdown_tx, shutdown_rx) = watch::channel(false);
208 let shutdown_tx = Arc::new(shutdown_tx);
209 let pump_drain_gate = PumpDrainGate::new();
210 let task_pump_drain_gate = pump_drain_gate.clone();
211
212 let join = std::thread::spawn(move || {
213 let rt = tokio::runtime::Builder::new_multi_thread()
220 .worker_threads(2)
221 .enable_all()
222 .build()
223 .expect("failed to build OrchestratorHarness tokio runtime");
224 let local = tokio::task::LocalSet::new();
225 rt.block_on(local.run_until(orchestrator_task(
226 config,
227 ready_tx,
228 shutdown_rx,
229 task_pump_drain_gate,
230 )));
231 });
232
233 match ready_rx.await {
234 Ok(Ok(ready)) => Ok(Self {
235 event_log: ready.event_log,
236 listener_url: ready.listener_url,
237 local_addr: ready.local_addr,
238 state_dir: ready.state_dir,
239 admin_reload: ready.admin_reload,
240 shutdown_tx,
241 pump_drain_gate,
242 join: Some(join),
243 }),
244 Ok(Err(error)) => {
245 let _ = join.join();
246 Err(HarnessError::from(error))
247 }
248 Err(_) => {
249 let _ = join.join();
250 Err(HarnessError(
251 "harness thread exited before signaling readiness".to_string(),
252 ))
253 }
254 }
255 }
256
257 pub fn listener_url(&self) -> &str {
258 &self.listener_url
259 }
260
261 pub fn local_addr(&self) -> SocketAddr {
262 self.local_addr
263 }
264
265 pub fn event_log(&self) -> Arc<AnyEventLog> {
266 self.event_log.clone()
267 }
268
269 pub fn state_dir(&self) -> &Path {
270 &self.state_dir
271 }
272
273 pub fn admin_reload(&self) -> AdminReloadHandle {
276 self.admin_reload.clone()
277 }
278
279 pub fn shutdown_trigger(&self) -> Arc<watch::Sender<bool>> {
282 self.shutdown_tx.clone()
283 }
284
285 pub fn pause_pump_drain(&self) {
288 self.pump_drain_gate.pause();
289 }
290
291 pub fn release_pump_drain(&self) {
293 self.pump_drain_gate.release();
294 }
295
296 pub async fn shutdown(mut self, _deadline: Duration) -> Result<ShutdownReport, HarnessError> {
298 let _ = self.shutdown_tx.send(true);
300 let join = self.join.take().expect("join handle");
302 tokio::task::spawn_blocking(move || join.join())
303 .await
304 .map_err(|_| HarnessError("spawn_blocking join failed".to_string()))?
305 .map_err(|_| HarnessError("harness background thread panicked".to_string()))?;
306 Ok(ShutdownReport { timed_out: false })
307 }
308}
309
310impl Drop for OrchestratorHarness {
311 fn drop(&mut self) {
312 let _ = self.shutdown_tx.send(true);
313 if let Some(join) = self.join.take() {
314 let _ = join.join();
315 }
316 }
317}
318
319async fn orchestrator_task(
322 config: OrchestratorConfig,
323 ready_tx: oneshot::Sender<Result<ReadyState, OrchestratorError>>,
324 shutdown_rx: watch::Receiver<bool>,
325 pump_drain_gate: PumpDrainGate,
326) {
327 if let Err(error) = orchestrator_lifecycle(config, ready_tx, shutdown_rx, pump_drain_gate).await
328 {
329 eprintln!("[harn] orchestrator harness error: {error}");
330 }
331}
332
333async fn orchestrator_lifecycle(
334 config: OrchestratorConfig,
335 ready_tx: oneshot::Sender<Result<ReadyState, OrchestratorError>>,
336 mut shutdown_rx: watch::Receiver<bool>,
337 pump_drain_gate: PumpDrainGate,
338) -> Result<(), OrchestratorError> {
339 harn_vm::reset_thread_local_state();
340
341 let shutdown_timeout = config.shutdown_timeout;
342 let drain_config = config.drain;
343 let pump_config = PumpConfig {
344 max_outstanding: config.pump.max_outstanding.max(1),
345 };
346
347 let state_dir = config.state_dir.clone();
348 std::fs::create_dir_all(&state_dir).map_err(|error| {
349 format!(
350 "failed to create state dir {}: {error}",
351 state_dir.display()
352 )
353 })?;
354
355 let observability = if let Some(log_format) = config.log_format {
356 Some(
357 harn_vm::observability::otel::ObservabilityGuard::install_orchestrator_subscriber(
358 harn_vm::observability::otel::OrchestratorObservabilityConfig {
359 log_format,
360 state_dir: Some(state_dir.clone()),
361 },
362 )?,
363 )
364 } else {
365 None
366 };
367
368 let config_path = absolutize_from_cwd(&config.manifest_path)?;
369 let (manifest, manifest_dir) = load_manifest(&config_path)?;
370 let drain_config = DrainConfig {
371 max_items: drain_config.max_items.max(1),
372 deadline: drain_config.deadline,
373 };
374 let pump_config = PumpConfig {
375 max_outstanding: pump_config.max_outstanding.max(1),
376 };
377
378 let startup_started_at = now_rfc3339()?;
379 let (admin_reload, mut reload_rx) = AdminReloadHandle::channel();
380
381 eprintln!("[harn] orchestrator manifest: {}", config_path.display());
382 if let Some(name) = manifest
383 .package
384 .as_ref()
385 .and_then(|package| package.name.as_deref())
386 {
387 eprintln!("[harn] orchestrator package: {name}");
388 }
389 eprintln!(
390 "[harn] orchestrator role: {} ({})",
391 config.role.as_str(),
392 config.role.registry_mode()
393 );
394 eprintln!("[harn] orchestrator state dir: {}", state_dir.display());
395 tracing::info!(
396 component = "orchestrator",
397 trace_id = "",
398 role = config.role.as_str(),
399 state_dir = %state_dir.display(),
400 manifest = %config_path.display(),
401 "orchestrator starting"
402 );
403
404 let workspace_root = manifest_dir.clone();
405 let mut vm = config
406 .role
407 .build_vm(&workspace_root, &manifest_dir, &state_dir)?;
408
409 let event_log = harn_vm::event_log::active_event_log()
410 .ok_or_else(|| "event log was not installed during VM initialization".to_string())?;
411 let event_log_description = event_log.describe();
412 let tenant_store = if config.role == OrchestratorRole::MultiTenant {
413 let store = harn_vm::TenantStore::load(&state_dir)?;
414 let active_tenants = store
415 .list()
416 .into_iter()
417 .filter(|tenant| tenant.status == harn_vm::TenantStatus::Active)
418 .collect::<Vec<_>>();
419 eprintln!(
420 "[harn] tenants loaded: {} active ({})",
421 active_tenants.len(),
422 active_tenants
423 .iter()
424 .map(|tenant| tenant.scope.id.0.as_str())
425 .collect::<Vec<_>>()
426 .join(", ")
427 );
428 Some(Arc::new(store))
429 } else {
430 None
431 };
432 eprintln!(
433 "[harn] event log: {} {}",
434 event_log_description.backend,
435 event_log_description
436 .location
437 .as_ref()
438 .map(|path| path.display().to_string())
439 .unwrap_or_else(|| "<memory>".to_string())
440 );
441
442 let secret_namespace = secret_namespace_for(&manifest_dir);
443 let secret_chain_display = configured_secret_chain_display();
444 let secret_chain = harn_vm::secrets::configured_default_chain(secret_namespace.clone())
445 .map_err(|error| format!("failed to configure secret providers: {error}"))?;
446 if secret_chain.providers().is_empty() {
447 return Err("secret provider chain resolved to zero providers"
448 .to_string()
449 .into());
450 }
451 eprintln!(
452 "[harn] secret providers: {} (namespace {})",
453 secret_chain_display, secret_namespace
454 );
455 let secret_provider: Arc<dyn harn_vm::secrets::SecretProvider> = Arc::new(secret_chain);
456
457 let extensions = package::load_runtime_extensions(&config_path);
458 let metrics_registry = Arc::new(harn_vm::MetricsRegistry::default());
459 harn_vm::install_active_metrics_registry(metrics_registry.clone());
460 let collected_triggers = package::collect_manifest_triggers(&mut vm, &extensions)
461 .await
462 .map_err(|error| format!("failed to collect manifest triggers: {error}"))?;
463 package::install_collected_manifest_triggers(&collected_triggers).await?;
464 apply_supervisor_state(&state_dir).await?;
465 eprintln!(
466 "[harn] registered triggers ({}): {}",
467 collected_triggers.len(),
468 format_trigger_summary(&collected_triggers)
469 );
470
471 let binding_versions = live_manifest_binding_versions();
472 let route_configs = build_route_configs(&collected_triggers, &binding_versions)?;
473 let mut connector_runtime = initialize_connectors(
474 &collected_triggers,
475 event_log.clone(),
476 secret_provider.clone(),
477 metrics_registry.clone(),
478 &extensions.provider_connectors,
479 )
480 .await?;
481 let route_configs = attach_route_connectors(
482 route_configs,
483 &connector_runtime.registry,
484 &extensions.provider_connectors,
485 )?;
486 let connector_clients = connector_runtime.registry.client_map().await;
487 harn_vm::install_active_connector_clients(connector_clients);
488 eprintln!(
489 "[harn] registered connectors ({}): {}",
490 connector_runtime.providers.len(),
491 connector_runtime.providers.join(", ")
492 );
493 eprintln!(
494 "[harn] activated connectors: {}",
495 format_activation_summary(&connector_runtime.activations)
496 );
497 let (mcp_router, mcp_service) = if config.mcp {
498 validate_mcp_paths(
499 &config.mcp_path,
500 &config.mcp_sse_path,
501 &config.mcp_messages_path,
502 )?;
503 if !has_orchestrator_api_keys_configured() && !has_mcp_oauth_configured() {
504 return Err(OrchestratorError::Serve(
505 "--mcp requires HARN_ORCHESTRATOR_API_KEYS or HARN_MCP_OAUTH_AUTHORIZATION_SERVERS so the embedded MCP management surface is authenticated"
506 .to_string(),
507 ));
508 }
509 let service = Arc::new(
510 crate::commands::mcp::serve::McpOrchestratorService::new_local(
511 crate::cli::OrchestratorLocalArgs {
512 config: config_path.clone(),
513 state_dir: state_dir.clone(),
514 },
515 )?,
516 );
517 let router = crate::commands::mcp::serve::http_router_for_service(
518 service.clone(),
519 config.mcp_path.clone(),
520 config.mcp_sse_path.clone(),
521 config.mcp_messages_path.clone(),
522 );
523 eprintln!(
524 "[harn] embedded MCP server mounted at {} (legacy SSE {}, messages {})",
525 config.mcp_path, config.mcp_sse_path, config.mcp_messages_path
526 );
527 (Some(router), Some(service))
528 } else {
529 (None, None)
530 };
531
532 let dispatcher = harn_vm::Dispatcher::with_event_log_and_metrics(
533 vm,
534 event_log.clone(),
535 Some(metrics_registry.clone()),
536 );
537 let mut pending_pumps = vec![(
538 PENDING_TOPIC.to_string(),
539 spawn_pending_pump(
540 event_log.clone(),
541 dispatcher.clone(),
542 pump_config,
543 metrics_registry.clone(),
544 pump_drain_gate.clone(),
545 PENDING_TOPIC,
546 )?,
547 )];
548 let mut inbox_pumps = vec![(
549 harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC.to_string(),
550 spawn_inbox_pump(
551 event_log.clone(),
552 dispatcher.clone(),
553 pump_config,
554 metrics_registry.clone(),
555 harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC,
556 )?,
557 )];
558 if let Some(store) = tenant_store.as_ref() {
559 for tenant in store
560 .list()
561 .into_iter()
562 .filter(|tenant| tenant.status == harn_vm::TenantStatus::Active)
563 {
564 let pending_topic = harn_vm::tenant_topic(
565 &tenant.scope.id,
566 &harn_vm::event_log::Topic::new(PENDING_TOPIC)
567 .map_err(|error| error.to_string())?,
568 )
569 .map_err(|error| error.to_string())?;
570 pending_pumps.push((
571 pending_topic.as_str().to_string(),
572 spawn_pending_pump(
573 event_log.clone(),
574 dispatcher.clone(),
575 pump_config,
576 metrics_registry.clone(),
577 pump_drain_gate.clone(),
578 pending_topic.as_str(),
579 )?,
580 ));
581 let inbox_topic = harn_vm::tenant_topic(
582 &tenant.scope.id,
583 &harn_vm::event_log::Topic::new(harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC)
584 .map_err(|error| error.to_string())?,
585 )
586 .map_err(|error| error.to_string())?;
587 inbox_pumps.push((
588 inbox_topic.as_str().to_string(),
589 spawn_inbox_pump(
590 event_log.clone(),
591 dispatcher.clone(),
592 pump_config,
593 metrics_registry.clone(),
594 inbox_topic.as_str(),
595 )?,
596 ));
597 }
598 }
599 let cron_pump = spawn_cron_pump(
600 event_log.clone(),
601 dispatcher.clone(),
602 pump_config,
603 metrics_registry.clone(),
604 pump_drain_gate.clone(),
605 )?;
606 let waitpoint_pump = spawn_waitpoint_resume_pump(
607 event_log.clone(),
608 dispatcher.clone(),
609 pump_config,
610 metrics_registry.clone(),
611 pump_drain_gate.clone(),
612 )?;
613 let waitpoint_cancel_pump = spawn_waitpoint_cancel_pump(
614 event_log.clone(),
615 dispatcher.clone(),
616 pump_config,
617 metrics_registry.clone(),
618 pump_drain_gate.clone(),
619 )?;
620 let waitpoint_sweeper = spawn_waitpoint_sweeper(dispatcher.clone());
621
622 let listener = ListenerRuntime::start(ListenerConfig {
623 bind: config.bind,
624 tls: config.tls.clone(),
625 event_log: event_log.clone(),
626 secrets: secret_provider.clone(),
627 allowed_origins: OriginAllowList::from_manifest(&manifest.orchestrator.allowed_origins),
628 max_body_bytes: ListenerConfig::max_body_bytes_or_default(
629 manifest.orchestrator.max_body_bytes,
630 ),
631 metrics_registry: metrics_registry.clone(),
632 admin_reload: Some(admin_reload.clone()),
633 mcp_router,
634 routes: route_configs,
635 tenant_store: tenant_store.clone(),
636 session_store: Some(Arc::new(harn_vm::SessionStore::new(event_log.clone()))),
637 })
638 .await?;
639 let local_bind = listener.local_addr();
640 let listener_metrics = listener.trigger_metrics();
641 let mut live_manifest = manifest;
642 let mut live_triggers = collected_triggers;
643 let _manifest_watcher = if config.watch_manifest {
644 Some(spawn_manifest_watcher(
645 config_path.clone(),
646 admin_reload.clone(),
647 )?)
648 } else {
649 None
650 };
651 connector_runtime.activations = connector_runtime
652 .registry
653 .activate_all(&connector_runtime.trigger_registry)
654 .await
655 .map_err(|error| error.to_string())?;
656 eprintln!(
657 "[harn] activated connectors: {}",
658 format_activation_summary(&connector_runtime.activations)
659 );
660
661 listener.mark_ready();
662 eprintln!("[harn] HTTP listener ready on {}", listener.url());
663 tracing::info!(
664 component = "orchestrator",
665 trace_id = "",
666 listener_url = %listener.url(),
667 "HTTP listener ready"
668 );
669
670 write_state_snapshot(
671 &state_dir.join(STATE_SNAPSHOT_FILE),
672 &ServeStateSnapshot {
673 status: "running".to_string(),
674 role: config.role.as_str().to_string(),
675 bind: local_bind.to_string(),
676 listener_url: listener.url(),
677 manifest_path: config_path.display().to_string(),
678 state_dir: state_dir.display().to_string(),
679 started_at: startup_started_at.clone(),
680 stopped_at: None,
681 secret_provider_chain: secret_chain_display.clone(),
682 event_log_backend: event_log_description.backend.to_string(),
683 event_log_location: event_log_description
684 .location
685 .as_ref()
686 .map(|path| path.display().to_string()),
687 triggers: trigger_state_snapshots(&live_triggers, &listener_metrics),
688 connectors: connector_runtime.providers.clone(),
689 activations: connector_runtime
690 .activations
691 .iter()
692 .map(|activation| ConnectorActivationSnapshot {
693 provider: activation.provider.as_str().to_string(),
694 binding_count: activation.binding_count,
695 })
696 .collect(),
697 },
698 )?;
699
700 append_lifecycle_event(
701 &event_log,
702 "startup",
703 json!({
704 "bind": local_bind.to_string(),
705 "manifest": config_path.display().to_string(),
706 "role": config.role.as_str(),
707 "state_dir": state_dir.display().to_string(),
708 "trigger_count": live_triggers.len(),
709 "connector_count": connector_runtime.providers.len(),
710 "tls_enabled": listener.scheme() == "https",
711 "shutdown_timeout_secs": shutdown_timeout.as_secs(),
712 "drain_max_items": drain_config.max_items,
713 "drain_deadline_secs": drain_config.deadline.as_secs(),
714 "pump_max_outstanding": pump_config.max_outstanding,
715 }),
716 )
717 .await?;
718
719 let stranded = stranded_envelopes(&event_log, Duration::ZERO).await?;
720 if !stranded.is_empty() {
721 eprintln!(
722 "[harn] startup found {} stranded inbox envelope(s); inspect with `harn orchestrator queue` and recover explicitly with `harn orchestrator recover --dry-run --envelope-age ...`",
723 stranded.len()
724 );
725 }
726 append_lifecycle_event(
727 &event_log,
728 "startup_stranded_envelopes",
729 json!({
730 "count": stranded.len(),
731 }),
732 )
733 .await?;
734
735 let _ = ready_tx.send(Ok(ReadyState {
737 event_log: event_log.clone(),
738 listener_url: listener.url(),
739 local_addr: local_bind,
740 state_dir: state_dir.clone(),
741 admin_reload: admin_reload.clone(),
742 }));
743
744 let mut ctx = RuntimeCtx {
746 role: config.role,
747 config_path: &config_path,
748 state_dir: &state_dir,
749 bind: local_bind,
750 startup_started_at: &startup_started_at,
751 event_log: &event_log,
752 event_log_description: &event_log_description,
753 secret_chain_display: &secret_chain_display,
754 listener: &listener,
755 connectors: &mut connector_runtime,
756 live_manifest: &mut live_manifest,
757 live_triggers: &mut live_triggers,
758 secret_provider: &secret_provider,
759 metrics_registry: &metrics_registry,
760 mcp_service: mcp_service.as_ref(),
761 reload_rx: &mut reload_rx,
762 };
763
764 loop {
765 tokio::select! {
766 changed = shutdown_rx.changed() => {
767 if changed.is_err() || *shutdown_rx.borrow() {
768 break;
769 }
770 }
771 Some(request) = ctx.reload_rx.recv() => {
772 handle_reload_request(&mut ctx, request).await?;
773 }
774 }
775 }
776
777 listener.mark_not_ready();
778 let shutdown = graceful_shutdown(
779 GracefulShutdownCtx {
780 role: config.role,
781 bind: local_bind,
782 listener_url: listener.url(),
783 config_path: &config_path,
784 state_dir: &state_dir,
785 startup_started_at: &startup_started_at,
786 event_log: &event_log,
787 event_log_description: &event_log_description,
788 secret_chain_display: &secret_chain_display,
789 triggers: &live_triggers,
790 connectors: &connector_runtime,
791 shutdown_timeout,
792 drain_config,
793 },
794 listener,
795 dispatcher,
796 pending_pumps,
797 cron_pump,
798 inbox_pumps,
799 waitpoint_pump,
800 waitpoint_cancel_pump,
801 waitpoint_sweeper,
802 )
803 .await;
804
805 if let Some(obs) = observability {
806 if let Err(error) = obs.shutdown() {
807 if shutdown.is_ok() {
808 return Err(OrchestratorError::Serve(error));
809 }
810 eprintln!("[harn] observability shutdown warning: {error}");
811 }
812 }
813 harn_vm::clear_active_metrics_registry();
814 shutdown
815}
816
817struct ConnectorRuntime {
820 registry: harn_vm::ConnectorRegistry,
821 trigger_registry: harn_vm::TriggerRegistry,
822 handles: Vec<harn_vm::connectors::ConnectorHandle>,
823 providers: Vec<String>,
824 activations: Vec<harn_vm::ActivationHandle>,
825 #[cfg_attr(not(unix), allow(dead_code))]
826 provider_overrides: Vec<ResolvedProviderConnectorConfig>,
827}
828
829#[cfg_attr(not(unix), allow(dead_code))]
830#[derive(Clone, Debug, Default, Serialize)]
831struct ManifestReloadSummary {
832 added: Vec<String>,
833 modified: Vec<String>,
834 removed: Vec<String>,
835 unchanged: Vec<String>,
836}
837
838#[derive(Clone, Copy, Debug, PartialEq, Eq)]
839enum PumpMode {
840 Running,
841 Draining(PumpDrainRequest),
842}
843
844#[derive(Clone, Copy, Debug, PartialEq, Eq)]
845struct PumpDrainRequest {
846 up_to: u64,
847 config: DrainConfig,
848 deadline: tokio::time::Instant,
849}
850
851#[derive(Clone, Copy, Debug, PartialEq, Eq)]
852enum PumpDrainStopReason {
853 Drained,
854 MaxItems,
855 Deadline,
856 Error,
857}
858
859impl PumpDrainStopReason {
860 fn as_str(self) -> &'static str {
861 match self {
862 Self::Drained => "drained",
863 Self::MaxItems => "max_items",
864 Self::Deadline => "deadline",
865 Self::Error => "error",
866 }
867 }
868}
869
870#[derive(Clone, Copy, Debug, Default)]
871struct PumpStats {
872 last_seen: u64,
873 processed: u64,
874}
875
876#[derive(Clone, Copy, Debug)]
877struct PumpDrainProgress {
878 request: PumpDrainRequest,
879 start_seen: u64,
880}
881
882#[derive(Clone, Copy, Debug)]
883struct PumpDrainReport {
884 stats: PumpStats,
885 drain_items: u64,
886 remaining_queued: u64,
887 outstanding_tasks: usize,
888 stop_reason: PumpDrainStopReason,
889}
890
891impl PumpDrainReport {
892 fn truncated(self) -> bool {
893 self.remaining_queued > 0 || self.outstanding_tasks > 0
894 }
895}
896
897struct PumpHandle {
898 mode_tx: watch::Sender<PumpMode>,
899 join: tokio::task::JoinHandle<Result<PumpDrainReport, OrchestratorError>>,
900}
901
902impl PumpHandle {
903 async fn drain(
904 self,
905 log: &Arc<AnyEventLog>,
906 topic_name: &str,
907 up_to: u64,
908 config: DrainConfig,
909 overall_deadline: tokio::time::Instant,
910 ) -> Result<PumpDrainReport, OrchestratorError> {
911 let drain_deadline = std::cmp::min(
912 tokio::time::Instant::now() + config.deadline,
913 overall_deadline,
914 );
915 let _ = self.mode_tx.send(PumpMode::Draining(PumpDrainRequest {
916 up_to,
917 config,
918 deadline: drain_deadline,
919 }));
920 append_pump_lifecycle_event(
921 log,
922 "pump_drain_started",
923 json!({
924 "topic": topic_name,
925 "up_to": up_to,
926 "max_items": config.max_items,
927 "drain_deadline_ms": config.deadline.as_millis(),
928 }),
929 )
930 .await?;
931 match self.join.await {
932 Ok(result) => result,
933 Err(error) => Err(format!("pump task join failed: {error}").into()),
934 }
935 }
936}
937
938struct WaitpointSweepHandle {
939 stop_tx: watch::Sender<bool>,
940 join: tokio::task::JoinHandle<Result<(), OrchestratorError>>,
941}
942
943impl WaitpointSweepHandle {
944 async fn shutdown(self) -> Result<(), OrchestratorError> {
945 let _ = self.stop_tx.send(true);
946 match self.join.await {
947 Ok(result) => result,
948 Err(error) => Err(format!("waitpoint sweeper join failed: {error}").into()),
949 }
950 }
951}
952
953#[derive(Debug, Deserialize)]
954struct PendingTriggerRecord {
955 trigger_id: String,
956 binding_version: u32,
957 event: harn_vm::TriggerEvent,
958}
959
960fn spawn_pending_pump(
963 event_log: Arc<harn_vm::event_log::AnyEventLog>,
964 dispatcher: harn_vm::Dispatcher,
965 pump_config: PumpConfig,
966 metrics_registry: Arc<harn_vm::MetricsRegistry>,
967 pump_drain_gate: PumpDrainGate,
968 topic_name: &str,
969) -> Result<PumpHandle, OrchestratorError> {
970 let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
971 spawn_topic_pump(
972 event_log,
973 topic,
974 pump_config,
975 metrics_registry,
976 pump_drain_gate,
977 move |logged| {
978 let dispatcher = dispatcher.clone();
979 async move {
980 if pending_pump_test_should_fail() {
981 return Err("test pending pump failure".to_string().into());
982 }
983 if logged.kind != "trigger_event" {
984 return Ok(false);
985 }
986 let record: PendingTriggerRecord = serde_json::from_value(logged.payload)
987 .map_err(|error| format!("failed to decode pending trigger event: {error}"))?;
988 dispatcher
989 .enqueue_targeted_with_headers(
990 Some(record.trigger_id),
991 Some(record.binding_version),
992 record.event,
993 Some(&logged.headers),
994 )
995 .await
996 .map_err(|error| format!("failed to enqueue pending trigger event: {error}"))?;
997 Ok(true)
998 }
999 },
1000 )
1001}
1002
1003fn spawn_cron_pump(
1004 event_log: Arc<harn_vm::event_log::AnyEventLog>,
1005 dispatcher: harn_vm::Dispatcher,
1006 pump_config: PumpConfig,
1007 metrics_registry: Arc<harn_vm::MetricsRegistry>,
1008 pump_drain_gate: PumpDrainGate,
1009) -> Result<PumpHandle, OrchestratorError> {
1010 let topic =
1011 harn_vm::event_log::Topic::new(CRON_TICK_TOPIC).map_err(|error| error.to_string())?;
1012 spawn_topic_pump(
1013 event_log,
1014 topic,
1015 pump_config,
1016 metrics_registry,
1017 pump_drain_gate,
1018 move |logged| {
1019 let dispatcher = dispatcher.clone();
1020 async move {
1021 if logged.kind != "trigger_event" {
1022 return Ok(false);
1023 }
1024 let event: harn_vm::TriggerEvent = serde_json::from_value(logged.payload)
1025 .map_err(|error| format!("failed to decode cron trigger event: {error}"))?;
1026 let trigger_id = match &event.provider_payload {
1027 harn_vm::ProviderPayload::Known(
1028 harn_vm::triggers::event::KnownProviderPayload::Cron(payload),
1029 ) => payload.cron_id.clone(),
1030 _ => None,
1031 };
1032 dispatcher
1033 .enqueue_targeted_with_headers(trigger_id, None, event, Some(&logged.headers))
1034 .await
1035 .map_err(|error| format!("failed to enqueue cron trigger event: {error}"))?;
1036 Ok(true)
1037 }
1038 },
1039 )
1040}
1041
1042fn spawn_inbox_pump(
1043 event_log: Arc<harn_vm::event_log::AnyEventLog>,
1044 dispatcher: harn_vm::Dispatcher,
1045 pump_config: PumpConfig,
1046 metrics_registry: Arc<harn_vm::MetricsRegistry>,
1047 topic_name: &str,
1048) -> Result<PumpHandle, OrchestratorError> {
1049 let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
1050 let consumer = pump_consumer_id(&topic)?;
1051 let inbox_task_release_file = inbox_task_test_release_file();
1052 let (mode_tx, mut mode_rx) = watch::channel(PumpMode::Running);
1053 let join = tokio::task::spawn_local(async move {
1054 let start_from = event_log
1055 .consumer_cursor(&topic, &consumer)
1056 .await
1057 .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
1058 .or(event_log
1059 .latest(&topic)
1060 .await
1061 .map_err(|error| format!("failed to read topic head {topic}: {error}"))?);
1062 let mut stream = event_log
1063 .clone()
1064 .subscribe(&topic, start_from)
1065 .await
1066 .map_err(|error| format!("failed to subscribe topic {topic}: {error}"))?;
1067 let mut stats = PumpStats {
1068 last_seen: start_from.unwrap_or(0),
1069 processed: 0,
1070 };
1071 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1072 let mut drain_progress = None;
1073 let mut tasks = JoinSet::new();
1074
1075 loop {
1076 if let Some(progress) = drain_progress {
1077 if let Some(report) = maybe_finish_pump_drain(stats, progress, tasks.len()) {
1078 return Ok(report);
1079 }
1080 }
1081
1082 let deadline = drain_progress.map(|progress| progress.request.deadline);
1083 let mut deadline_wait = Box::pin(async move {
1084 if let Some(deadline) = deadline {
1085 tokio::time::sleep_until(deadline).await;
1086 } else {
1087 std::future::pending::<()>().await;
1088 }
1089 });
1090
1091 tokio::select! {
1092 changed = mode_rx.changed() => {
1093 if changed.is_err() {
1094 break;
1095 }
1096 if let PumpMode::Draining(request) = *mode_rx.borrow() {
1097 drain_progress.get_or_insert(PumpDrainProgress {
1098 request,
1099 start_seen: stats.last_seen,
1100 });
1101 }
1102 }
1103 _ = &mut deadline_wait => {
1104 if let Some(progress) = drain_progress {
1105 return Ok(pump_drain_report(
1106 stats,
1107 progress.start_seen,
1108 progress.request.up_to,
1109 tasks.len(),
1110 PumpDrainStopReason::Deadline,
1111 ));
1112 }
1113 }
1114 joined = tasks.join_next(), if !tasks.is_empty() => {
1115 match joined {
1116 Some(Ok(())) => {
1117 record_pump_metrics(
1118 &metrics_registry,
1119 &event_log,
1120 &topic,
1121 stats.last_seen,
1122 tasks.len(),
1123 )
1124 .await?;
1125 }
1126 Some(Err(error)) => {
1127 return Err(format!("inbox dispatch task join failed: {error}").into());
1128 }
1129 None => {}
1130 }
1131 }
1132 _ = tokio::time::sleep(Duration::from_millis(25)), if tasks.len() >= pump_config.max_outstanding => {
1133 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1134 }
1135 received = stream.next(), if tasks.len() < pump_config.max_outstanding => {
1136 let Some(received) = received else {
1137 break;
1138 };
1139 let (event_id, logged) = received
1140 .map_err(|error| format!("topic pump read failed for {topic}: {error}"))?;
1141 if logged.kind != "event_ingested" {
1142 stats.last_seen = event_id;
1143 event_log
1144 .ack(&topic, &consumer, event_id)
1145 .await
1146 .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1147 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1148 continue;
1149 }
1150 append_pump_lifecycle_event(
1151 &event_log,
1152 "pump_received",
1153 json!({
1154 "topic": topic.as_str(),
1155 "event_log_id": event_id,
1156 "outstanding": tasks.len(),
1157 "max_outstanding": pump_config.max_outstanding,
1158 }),
1159 )
1160 .await?;
1161 let envelope: harn_vm::triggers::dispatcher::InboxEnvelope =
1162 serde_json::from_value(logged.payload)
1163 .map_err(|error| format!("failed to decode dispatcher inbox event: {error}"))?;
1164 let trigger_id = envelope.trigger_id.clone();
1165 let binding_version = envelope.binding_version;
1166 let trigger_event_id = envelope.event.id.0.clone();
1167 let parent_headers = logged.headers.clone();
1168 append_pump_lifecycle_event(
1169 &event_log,
1170 "pump_eligible",
1171 json!({
1172 "topic": topic.as_str(),
1173 "event_log_id": event_id,
1174 "trigger_id": trigger_id.clone(),
1175 "binding_version": binding_version,
1176 "trigger_event_id": trigger_event_id,
1177 }),
1178 )
1179 .await?;
1180 metrics_registry.record_orchestrator_pump_admission_delay(
1181 topic.as_str(),
1182 admission_delay(logged.occurred_at_ms),
1183 );
1184 append_pump_lifecycle_event(
1185 &event_log,
1186 "pump_admitted",
1187 json!({
1188 "topic": topic.as_str(),
1189 "event_log_id": event_id,
1190 "outstanding_after_admit": tasks.len() + 1,
1191 "max_outstanding": pump_config.max_outstanding,
1192 "trigger_id": trigger_id.clone(),
1193 "binding_version": binding_version,
1194 "trigger_event_id": trigger_event_id,
1195 }),
1196 )
1197 .await?;
1198 let dispatcher = dispatcher.clone();
1199 let task_event_log = event_log.clone();
1200 let task_topic = topic.as_str().to_string();
1201 let inbox_task_release_file = inbox_task_release_file.clone();
1202 tasks.spawn_local(async move {
1203 if let Some(path) = inbox_task_release_file.as_ref() {
1204 wait_for_test_release_file(path).await;
1205 }
1206 let _ = append_pump_lifecycle_event(
1207 &task_event_log,
1208 "pump_dispatch_started",
1209 json!({
1210 "topic": task_topic.clone(),
1211 "event_log_id": event_id,
1212 "trigger_id": trigger_id,
1213 "binding_version": binding_version,
1214 "trigger_event_id": trigger_event_id,
1215 }),
1216 )
1217 .await;
1218 let result = dispatcher
1219 .dispatch_inbox_envelope_with_parent_headers(
1220 envelope,
1221 &parent_headers,
1222 )
1223 .await;
1224 let (status, error_message) = match result {
1225 Ok(_) => ("completed", None),
1226 Err(error) => {
1227 let message = error.to_string();
1228 eprintln!("[harn] inbox dispatch warning: {message}");
1229 ("failed", Some(message))
1230 }
1231 };
1232 let _ = append_pump_lifecycle_event(
1233 &task_event_log,
1234 "pump_dispatch_completed",
1235 json!({
1236 "topic": task_topic,
1237 "event_log_id": event_id,
1238 "status": status,
1239 "error": error_message,
1240 }),
1241 )
1242 .await;
1243 });
1244 stats.last_seen = event_id;
1245 stats.processed += 1;
1246 event_log
1247 .ack(&topic, &consumer, event_id)
1248 .await
1249 .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1250 append_pump_lifecycle_event(
1251 &event_log,
1252 "pump_acked",
1253 json!({
1254 "topic": topic.as_str(),
1255 "event_log_id": event_id,
1256 "cursor": event_id,
1257 }),
1258 )
1259 .await?;
1260 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1261 }
1262 }
1263 }
1264
1265 while let Some(joined) = tasks.join_next().await {
1266 joined.map_err(|error| format!("inbox dispatch task join failed: {error}"))?;
1267 record_pump_metrics(
1268 &metrics_registry,
1269 &event_log,
1270 &topic,
1271 stats.last_seen,
1272 tasks.len(),
1273 )
1274 .await?;
1275 }
1276
1277 Ok(drain_progress
1278 .map(|progress| {
1279 pump_drain_report(
1280 stats,
1281 progress.start_seen,
1282 progress.request.up_to,
1283 0,
1284 PumpDrainStopReason::Drained,
1285 )
1286 })
1287 .unwrap_or_else(|| PumpDrainReport {
1288 stats,
1289 drain_items: 0,
1290 remaining_queued: 0,
1291 outstanding_tasks: 0,
1292 stop_reason: PumpDrainStopReason::Drained,
1293 }))
1294 });
1295 Ok(PumpHandle { mode_tx, join })
1296}
1297
1298fn spawn_waitpoint_resume_pump(
1299 event_log: Arc<harn_vm::event_log::AnyEventLog>,
1300 dispatcher: harn_vm::Dispatcher,
1301 pump_config: PumpConfig,
1302 metrics_registry: Arc<harn_vm::MetricsRegistry>,
1303 pump_drain_gate: PumpDrainGate,
1304) -> Result<PumpHandle, OrchestratorError> {
1305 let topic = harn_vm::event_log::Topic::new(harn_vm::WAITPOINT_RESUME_TOPIC)
1306 .map_err(|error| error.to_string())?;
1307 spawn_topic_pump(
1308 event_log,
1309 topic,
1310 pump_config,
1311 metrics_registry,
1312 pump_drain_gate,
1313 move |logged| {
1314 let dispatcher = dispatcher.clone();
1315 async move {
1316 harn_vm::process_waitpoint_resume_event(&dispatcher, logged)
1317 .await
1318 .map_err(OrchestratorError::from)
1319 }
1320 },
1321 )
1322}
1323
1324fn spawn_waitpoint_cancel_pump(
1325 event_log: Arc<harn_vm::event_log::AnyEventLog>,
1326 dispatcher: harn_vm::Dispatcher,
1327 pump_config: PumpConfig,
1328 metrics_registry: Arc<harn_vm::MetricsRegistry>,
1329 pump_drain_gate: PumpDrainGate,
1330) -> Result<PumpHandle, OrchestratorError> {
1331 let topic = harn_vm::event_log::Topic::new(harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC)
1332 .map_err(|error| error.to_string())?;
1333 spawn_topic_pump(
1334 event_log,
1335 topic,
1336 pump_config,
1337 metrics_registry,
1338 pump_drain_gate,
1339 move |logged| {
1340 let dispatcher = dispatcher.clone();
1341 async move {
1342 if logged.kind != "dispatch_cancel_requested" {
1343 return Ok(false);
1344 }
1345 harn_vm::service_waitpoints_once(&dispatcher, None)
1346 .await
1347 .map_err(|error| {
1348 OrchestratorError::Serve(format!(
1349 "failed to service waitpoints on cancel: {error}"
1350 ))
1351 })?;
1352 Ok(true)
1353 }
1354 },
1355 )
1356}
1357
1358fn spawn_waitpoint_sweeper(dispatcher: harn_vm::Dispatcher) -> WaitpointSweepHandle {
1359 let (stop_tx, mut stop_rx) = watch::channel(false);
1360 let join = tokio::task::spawn_local(async move {
1361 let mut interval = tokio::time::interval(WAITPOINT_SERVICE_INTERVAL);
1362 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1363 loop {
1364 tokio::select! {
1365 changed = stop_rx.changed() => {
1366 if changed.is_err() || *stop_rx.borrow() {
1367 break;
1368 }
1369 }
1370 _ = interval.tick() => {
1371 harn_vm::service_waitpoints_once(&dispatcher, None)
1372 .await
1373 .map_err(|error| format!("failed to service waitpoints on sweep: {error}"))?;
1374 }
1375 }
1376 }
1377 Ok(())
1378 });
1379 WaitpointSweepHandle { stop_tx, join }
1380}
1381
1382fn spawn_topic_pump<F, Fut>(
1383 event_log: Arc<harn_vm::event_log::AnyEventLog>,
1384 topic: harn_vm::event_log::Topic,
1385 _pump_config: PumpConfig,
1386 metrics_registry: Arc<harn_vm::MetricsRegistry>,
1387 pump_drain_gate: PumpDrainGate,
1388 process: F,
1389) -> Result<PumpHandle, OrchestratorError>
1390where
1391 F: Fn(harn_vm::event_log::LogEvent) -> Fut + 'static,
1392 Fut: std::future::Future<Output = Result<bool, OrchestratorError>> + 'static,
1393{
1394 let consumer = pump_consumer_id(&topic)?;
1395 let mut pump_drain_gate_rx = pump_drain_gate.subscribe();
1396 let (mode_tx, mut mode_rx) = watch::channel(PumpMode::Running);
1397 let join = tokio::task::spawn_local(async move {
1398 let start_from = event_log
1399 .consumer_cursor(&topic, &consumer)
1400 .await
1401 .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
1402 .or(event_log
1403 .latest(&topic)
1404 .await
1405 .map_err(|error| format!("failed to read topic head {topic}: {error}"))?);
1406 let mut stream = event_log
1407 .clone()
1408 .subscribe(&topic, start_from)
1409 .await
1410 .map_err(|error| format!("failed to subscribe topic {topic}: {error}"))?;
1411 let mut stats = PumpStats {
1412 last_seen: start_from.unwrap_or(0),
1413 processed: 0,
1414 };
1415 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1416 let mut drain_progress = None;
1417 loop {
1418 if let Some(progress) = drain_progress {
1419 if let Some(report) = maybe_finish_pump_drain(stats, progress, 0) {
1420 return Ok(report);
1421 }
1422 }
1423 let deadline = drain_progress.map(|progress| progress.request.deadline);
1424 let mut deadline_wait = Box::pin(async move {
1425 if let Some(deadline) = deadline {
1426 tokio::time::sleep_until(deadline).await;
1427 } else {
1428 std::future::pending::<()>().await;
1429 }
1430 });
1431 tokio::select! {
1432 changed = mode_rx.changed() => {
1433 if changed.is_err() {
1434 break;
1435 }
1436 if let PumpMode::Draining(request) = *mode_rx.borrow() {
1437 drain_progress.get_or_insert(PumpDrainProgress {
1438 request,
1439 start_seen: stats.last_seen,
1440 });
1441 }
1442 }
1443 _ = &mut deadline_wait => {
1444 if let Some(progress) = drain_progress {
1445 return Ok(pump_drain_report(
1446 stats,
1447 progress.start_seen,
1448 progress.request.up_to,
1449 0,
1450 PumpDrainStopReason::Deadline,
1451 ));
1452 }
1453 }
1454 received = stream.next() => {
1455 let Some(received) = received else {
1456 break;
1457 };
1458 let (event_id, logged) = received
1459 .map_err(|error| format!("topic pump read failed for {topic}: {error}"))?;
1460 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 1).await?;
1461 metrics_registry.record_orchestrator_pump_admission_delay(
1462 topic.as_str(),
1463 admission_delay(logged.occurred_at_ms),
1464 );
1465 wait_for_pump_drain_release(
1466 &event_log,
1467 &topic,
1468 event_id,
1469 &mut pump_drain_gate_rx,
1470 )
1471 .await?;
1472 let handled = process(logged).await?;
1473 stats.last_seen = event_id;
1474 if handled {
1475 stats.processed += 1;
1476 }
1477 event_log
1478 .ack(&topic, &consumer, event_id)
1479 .await
1480 .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1481 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1482 }
1483 }
1484 }
1485 Ok(drain_progress
1486 .map(|progress| {
1487 pump_drain_report(
1488 stats,
1489 progress.start_seen,
1490 progress.request.up_to,
1491 0,
1492 PumpDrainStopReason::Drained,
1493 )
1494 })
1495 .unwrap_or_else(|| PumpDrainReport {
1496 stats,
1497 drain_items: 0,
1498 remaining_queued: 0,
1499 outstanding_tasks: 0,
1500 stop_reason: PumpDrainStopReason::Drained,
1501 }))
1502 });
1503 Ok(PumpHandle { mode_tx, join })
1504}
1505
1506#[allow(clippy::too_many_arguments)]
1509async fn graceful_shutdown(
1510 ctx: GracefulShutdownCtx<'_>,
1511 listener: ListenerRuntime,
1512 dispatcher: harn_vm::Dispatcher,
1513 pending_pumps: Vec<(String, PumpHandle)>,
1514 cron_pump: PumpHandle,
1515 inbox_pumps: Vec<(String, PumpHandle)>,
1516 waitpoint_pump: PumpHandle,
1517 waitpoint_cancel_pump: PumpHandle,
1518 waitpoint_sweeper: WaitpointSweepHandle,
1519) -> Result<(), OrchestratorError> {
1520 eprintln!("[harn] signal received, starting graceful shutdown...");
1521 tracing::info!(
1522 component = "orchestrator",
1523 trace_id = "",
1524 shutdown_timeout_secs = ctx.shutdown_timeout.as_secs(),
1525 "signal received, starting graceful shutdown"
1526 );
1527 let listener_in_flight = listener
1528 .trigger_metrics()
1529 .into_values()
1530 .map(|metrics| metrics.in_flight)
1531 .sum::<u64>();
1532 let dispatcher_before = dispatcher.snapshot();
1533 append_lifecycle_event(
1534 ctx.event_log,
1535 "draining",
1536 json!({
1537 "bind": ctx.bind.to_string(),
1538 "role": ctx.role.as_str(),
1539 "status": "draining",
1540 "http_in_flight": listener_in_flight,
1541 "dispatcher_in_flight": dispatcher_before.in_flight,
1542 "dispatcher_retry_queue_depth": dispatcher_before.retry_queue_depth,
1543 "dispatcher_dlq_depth": dispatcher_before.dlq_depth,
1544 "shutdown_timeout_secs": ctx.shutdown_timeout.as_secs(),
1545 "drain_max_items": ctx.drain_config.max_items,
1546 "drain_deadline_secs": ctx.drain_config.deadline.as_secs(),
1547 }),
1548 )
1549 .await?;
1550
1551 let deadline = tokio::time::Instant::now() + ctx.shutdown_timeout;
1552 let listener_metrics = listener.shutdown(remaining_budget(deadline)).await?;
1553 for handle in &ctx.connectors.handles {
1554 let connector = handle.lock().await;
1555 if let Err(error) = connector.shutdown(remaining_budget(deadline)).await {
1556 eprintln!(
1557 "[harn] connector {} shutdown warning: {error}",
1558 connector.provider_id().as_str()
1559 );
1560 }
1561 }
1562
1563 let mut pending_processed = 0;
1564 for (topic_name, pump) in pending_pumps {
1565 let stats =
1566 drain_pump_best_effort(ctx.event_log, &topic_name, pump, ctx.drain_config, deadline)
1567 .await?;
1568 pending_processed += stats.stats.processed;
1569 emit_drain_truncated(ctx.event_log, &topic_name, stats, ctx.drain_config).await?;
1570 }
1571 let cron_stats = drain_pump_best_effort(
1572 ctx.event_log,
1573 CRON_TICK_TOPIC,
1574 cron_pump,
1575 ctx.drain_config,
1576 deadline,
1577 )
1578 .await?;
1579 emit_drain_truncated(ctx.event_log, CRON_TICK_TOPIC, cron_stats, ctx.drain_config).await?;
1580 let mut inbox_processed = 0;
1581 for (topic_name, pump) in inbox_pumps {
1582 let stats =
1583 drain_pump_best_effort(ctx.event_log, &topic_name, pump, ctx.drain_config, deadline)
1584 .await?;
1585 inbox_processed += stats.stats.processed;
1586 emit_drain_truncated(ctx.event_log, &topic_name, stats, ctx.drain_config).await?;
1587 }
1588 let waitpoint_stats = waitpoint_pump
1589 .drain(
1590 ctx.event_log,
1591 harn_vm::WAITPOINT_RESUME_TOPIC,
1592 topic_latest_id(ctx.event_log, harn_vm::WAITPOINT_RESUME_TOPIC).await?,
1593 ctx.drain_config,
1594 deadline,
1595 )
1596 .await?;
1597 emit_drain_truncated(
1598 ctx.event_log,
1599 harn_vm::WAITPOINT_RESUME_TOPIC,
1600 waitpoint_stats,
1601 ctx.drain_config,
1602 )
1603 .await?;
1604 let waitpoint_cancel_stats = waitpoint_cancel_pump
1605 .drain(
1606 ctx.event_log,
1607 harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC,
1608 topic_latest_id(ctx.event_log, harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC).await?,
1609 ctx.drain_config,
1610 deadline,
1611 )
1612 .await?;
1613 emit_drain_truncated(
1614 ctx.event_log,
1615 harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC,
1616 waitpoint_cancel_stats,
1617 ctx.drain_config,
1618 )
1619 .await?;
1620 waitpoint_sweeper.shutdown().await?;
1621 let drain_report = dispatcher
1622 .drain(remaining_budget(deadline))
1623 .await
1624 .map_err(|error| format!("failed to drain dispatcher: {error}"))?;
1625
1626 let stopped_at = now_rfc3339()?;
1627 let timed_out = !drain_report.drained;
1628 if timed_out {
1629 dispatcher.shutdown();
1630 }
1631 append_lifecycle_event(
1632 ctx.event_log,
1633 "stopped",
1634 json!({
1635 "bind": ctx.bind.to_string(),
1636 "role": ctx.role.as_str(),
1637 "status": "stopped",
1638 "http_in_flight": listener_in_flight,
1639 "dispatcher_in_flight": drain_report.in_flight,
1640 "dispatcher_retry_queue_depth": drain_report.retry_queue_depth,
1641 "dispatcher_dlq_depth": drain_report.dlq_depth,
1642 "pending_events_drained": pending_processed,
1643 "cron_events_drained": cron_stats.stats.processed,
1644 "inbox_events_drained": inbox_processed,
1645 "waitpoint_events_drained": waitpoint_stats.stats.processed,
1646 "waitpoint_cancel_events_drained": waitpoint_cancel_stats.stats.processed,
1647 "timed_out": timed_out,
1648 }),
1649 )
1650 .await?;
1651 ctx.event_log
1652 .flush()
1653 .await
1654 .map_err(|error| format!("failed to flush event log: {error}"))?;
1655
1656 write_state_snapshot(
1657 &ctx.state_dir.join(STATE_SNAPSHOT_FILE),
1658 &ServeStateSnapshot {
1659 status: "stopped".to_string(),
1660 role: ctx.role.as_str().to_string(),
1661 bind: ctx.bind.to_string(),
1662 listener_url: ctx.listener_url.clone(),
1663 manifest_path: ctx.config_path.display().to_string(),
1664 state_dir: ctx.state_dir.display().to_string(),
1665 started_at: ctx.startup_started_at.to_string(),
1666 stopped_at: Some(stopped_at),
1667 secret_provider_chain: ctx.secret_chain_display.to_string(),
1668 event_log_backend: ctx.event_log_description.backend.to_string(),
1669 event_log_location: ctx
1670 .event_log_description
1671 .location
1672 .as_ref()
1673 .map(|path| path.display().to_string()),
1674 triggers: trigger_state_snapshots(ctx.triggers, &listener_metrics),
1675 connectors: ctx.connectors.providers.clone(),
1676 activations: ctx
1677 .connectors
1678 .activations
1679 .iter()
1680 .map(|activation| ConnectorActivationSnapshot {
1681 provider: activation.provider.as_str().to_string(),
1682 binding_count: activation.binding_count,
1683 })
1684 .collect(),
1685 },
1686 )?;
1687
1688 if timed_out {
1689 eprintln!(
1690 "[harn] graceful shutdown timed out with {} dispatches and {} retry waits remaining",
1691 drain_report.in_flight, drain_report.retry_queue_depth
1692 );
1693 }
1694 eprintln!("[harn] graceful shutdown complete");
1695 tracing::info!(
1696 component = "orchestrator",
1697 trace_id = "",
1698 "graceful shutdown complete"
1699 );
1700 Ok(())
1701}
1702
1703struct GracefulShutdownCtx<'a> {
1704 role: OrchestratorRole,
1705 bind: SocketAddr,
1706 listener_url: String,
1707 config_path: &'a Path,
1708 state_dir: &'a Path,
1709 startup_started_at: &'a str,
1710 event_log: &'a Arc<AnyEventLog>,
1711 event_log_description: &'a harn_vm::event_log::EventLogDescription,
1712 secret_chain_display: &'a str,
1713 triggers: &'a [CollectedManifestTrigger],
1714 connectors: &'a ConnectorRuntime,
1715 shutdown_timeout: Duration,
1716 drain_config: DrainConfig,
1717}
1718
1719struct RuntimeCtx<'a> {
1722 role: OrchestratorRole,
1723 config_path: &'a Path,
1724 state_dir: &'a Path,
1725 bind: SocketAddr,
1726 startup_started_at: &'a str,
1727 event_log: &'a Arc<AnyEventLog>,
1728 event_log_description: &'a harn_vm::event_log::EventLogDescription,
1729 secret_chain_display: &'a str,
1730 listener: &'a ListenerRuntime,
1731 connectors: &'a mut ConnectorRuntime,
1732 live_manifest: &'a mut Manifest,
1733 live_triggers: &'a mut Vec<CollectedManifestTrigger>,
1734 secret_provider: &'a Arc<dyn harn_vm::secrets::SecretProvider>,
1735 metrics_registry: &'a Arc<harn_vm::MetricsRegistry>,
1736 mcp_service: Option<&'a Arc<crate::commands::mcp::serve::McpOrchestratorService>>,
1737 #[cfg_attr(not(unix), allow(dead_code))]
1738 reload_rx: &'a mut mpsc::UnboundedReceiver<AdminReloadRequest>,
1739}
1740
1741#[cfg_attr(not(unix), allow(dead_code))]
1742async fn handle_reload_request(
1743 ctx: &mut RuntimeCtx<'_>,
1744 request: AdminReloadRequest,
1745) -> Result<(), OrchestratorError> {
1746 let source = request.source.clone();
1747 match reload_manifest(ctx).await {
1748 Ok(summary) => {
1749 if let Some(mcp_service) = ctx.mcp_service {
1750 mcp_service.notify_manifest_reloaded();
1751 }
1752 write_running_state_snapshot(ctx)?;
1753 append_manifest_event(
1754 ctx.event_log,
1755 "reload_succeeded",
1756 json!({
1757 "source": source,
1758 "summary": summary,
1759 }),
1760 )
1761 .await?;
1762 eprintln!(
1763 "[harn] manifest reload ({source}) applied: +{} ~{} -{}",
1764 summary.added.len(),
1765 summary.modified.len(),
1766 summary.removed.len()
1767 );
1768 if let Some(response_tx) = request.response_tx {
1769 let _ = response_tx.send(serde_json::to_value(&summary).map_err(|error| {
1770 OrchestratorError::Serve(format!("failed to encode reload summary: {error}"))
1771 }));
1772 }
1773 }
1774 Err(error) => {
1775 eprintln!("[harn] manifest reload ({source}) failed: {error}");
1776 append_manifest_event(
1777 ctx.event_log,
1778 "reload_failed",
1779 json!({
1780 "source": source,
1781 "error": error.to_string(),
1782 }),
1783 )
1784 .await?;
1785 if let Some(response_tx) = request.response_tx {
1786 let _ = response_tx.send(Err(error));
1787 }
1788 }
1789 }
1790 Ok(())
1791}
1792
1793#[cfg_attr(not(unix), allow(dead_code))]
1794fn write_running_state_snapshot(ctx: &RuntimeCtx<'_>) -> Result<(), OrchestratorError> {
1795 let listener_metrics = ctx.listener.trigger_metrics();
1796 write_state_snapshot(
1797 &ctx.state_dir.join(STATE_SNAPSHOT_FILE),
1798 &ServeStateSnapshot {
1799 status: "running".to_string(),
1800 role: ctx.role.as_str().to_string(),
1801 bind: ctx.bind.to_string(),
1802 listener_url: ctx.listener.url(),
1803 manifest_path: ctx.config_path.display().to_string(),
1804 state_dir: ctx.state_dir.display().to_string(),
1805 started_at: ctx.startup_started_at.to_string(),
1806 stopped_at: None,
1807 secret_provider_chain: ctx.secret_chain_display.to_string(),
1808 event_log_backend: ctx.event_log_description.backend.to_string(),
1809 event_log_location: ctx
1810 .event_log_description
1811 .location
1812 .as_ref()
1813 .map(|path| path.display().to_string()),
1814 triggers: trigger_state_snapshots(ctx.live_triggers, &listener_metrics),
1815 connectors: ctx.connectors.providers.clone(),
1816 activations: ctx
1817 .connectors
1818 .activations
1819 .iter()
1820 .map(|activation| ConnectorActivationSnapshot {
1821 provider: activation.provider.as_str().to_string(),
1822 binding_count: activation.binding_count,
1823 })
1824 .collect(),
1825 },
1826 )
1827}
1828
1829#[cfg_attr(not(unix), allow(dead_code))]
1830async fn reload_manifest(
1831 ctx: &mut RuntimeCtx<'_>,
1832) -> Result<ManifestReloadSummary, OrchestratorError> {
1833 let (manifest, manifest_dir) = load_manifest(ctx.config_path)?;
1834 let mut vm = ctx
1835 .role
1836 .build_vm(&manifest_dir, &manifest_dir, ctx.state_dir)?;
1837 let extensions = package::load_runtime_extensions(ctx.config_path);
1838 let collected_triggers = package::collect_manifest_triggers(&mut vm, &extensions)
1839 .await
1840 .map_err(|error| format!("failed to collect manifest triggers: {error}"))?;
1841 let summary = summarize_manifest_reload(ctx.live_triggers, &collected_triggers);
1842 let connector_reload =
1843 connector_reload_fingerprint_map(ctx.live_triggers, &ctx.connectors.provider_overrides)
1844 != connector_reload_fingerprint_map(
1845 &collected_triggers,
1846 &extensions.provider_connectors,
1847 );
1848 let next_connector_runtime = if connector_reload {
1849 let mut runtime = initialize_connectors(
1850 &collected_triggers,
1851 ctx.event_log.clone(),
1852 ctx.secret_provider.clone(),
1853 ctx.metrics_registry.clone(),
1854 &extensions.provider_connectors,
1855 )
1856 .await?;
1857 runtime.activations = runtime
1858 .registry
1859 .activate_all(&runtime.trigger_registry)
1860 .await
1861 .map_err(|error| error.to_string())?;
1862 Some(runtime)
1863 } else {
1864 None
1865 };
1866 let previous_manifest = ctx.live_manifest.clone();
1867 let previous_triggers = ctx.live_triggers.clone();
1868 package::install_collected_manifest_triggers(&collected_triggers).await?;
1869 apply_supervisor_state(ctx.state_dir).await?;
1870 let binding_versions = live_manifest_binding_versions();
1871 let route_registry = next_connector_runtime
1872 .as_ref()
1873 .map(|runtime| &runtime.registry)
1874 .unwrap_or(&ctx.connectors.registry);
1875 let route_overrides = next_connector_runtime
1876 .as_ref()
1877 .map(|runtime| runtime.provider_overrides.as_slice())
1878 .unwrap_or(ctx.connectors.provider_overrides.as_slice());
1879 let route_configs = match build_route_configs(&collected_triggers, &binding_versions)
1880 .and_then(|routes| attach_route_connectors(routes, route_registry, route_overrides))
1881 {
1882 Ok(routes) => routes,
1883 Err(error) => {
1884 rollback_manifest_reload(ctx, &previous_manifest, &previous_triggers)
1885 .await
1886 .map_err(|rollback| format!("{error}; rollback failed: {rollback}"))?;
1887 return Err(error);
1888 }
1889 };
1890 if let Err(error) = ctx.listener.reload_routes(route_configs) {
1891 rollback_manifest_reload(ctx, &previous_manifest, &previous_triggers)
1892 .await
1893 .map_err(|rollback| format!("{error}; rollback failed: {rollback}"))?;
1894 return Err(error);
1895 }
1896 if let Some(runtime) = next_connector_runtime {
1897 let previous_handles = ctx.connectors.handles.clone();
1898 let connector_clients = runtime.registry.client_map().await;
1899 harn_vm::install_active_connector_clients(connector_clients);
1900 *ctx.connectors = runtime;
1901 for handle in previous_handles {
1902 let connector = handle.lock().await;
1903 if let Err(error) = connector.shutdown(Duration::from_secs(5)).await {
1904 eprintln!(
1905 "[harn] connector {} reload shutdown warning: {error}",
1906 connector.provider_id().as_str()
1907 );
1908 }
1909 }
1910 }
1911 *ctx.live_manifest = manifest;
1912 *ctx.live_triggers = collected_triggers;
1913 Ok(summary)
1914}
1915
1916#[cfg_attr(not(unix), allow(dead_code))]
1917async fn rollback_manifest_reload(
1918 ctx: &mut RuntimeCtx<'_>,
1919 previous_manifest: &Manifest,
1920 previous_triggers: &[CollectedManifestTrigger],
1921) -> Result<(), OrchestratorError> {
1922 package::install_collected_manifest_triggers(previous_triggers).await?;
1923 apply_supervisor_state(ctx.state_dir).await?;
1924 let binding_versions = live_manifest_binding_versions();
1925 let route_configs = build_route_configs(previous_triggers, &binding_versions)?;
1926 let route_configs = attach_route_connectors(
1927 route_configs,
1928 &ctx.connectors.registry,
1929 &ctx.connectors.provider_overrides,
1930 )?;
1931 ctx.listener.reload_routes(route_configs)?;
1932 *ctx.live_manifest = previous_manifest.clone();
1933 *ctx.live_triggers = previous_triggers.to_vec();
1934 Ok(())
1935}
1936
1937#[cfg_attr(not(unix), allow(dead_code))]
1938fn summarize_manifest_reload(
1939 current: &[CollectedManifestTrigger],
1940 next: &[CollectedManifestTrigger],
1941) -> ManifestReloadSummary {
1942 let current_map = trigger_fingerprint_map(current, true);
1943 let next_map = trigger_fingerprint_map(next, true);
1944 let mut summary = ManifestReloadSummary::default();
1945 let ids: BTreeSet<String> = current_map.keys().chain(next_map.keys()).cloned().collect();
1946 for id in ids {
1947 match (current_map.get(&id), next_map.get(&id)) {
1948 (None, Some(_)) => summary.added.push(id),
1949 (Some(_), None) => summary.removed.push(id),
1950 (Some(left), Some(right)) if left == right => summary.unchanged.push(id),
1951 (Some(_), Some(_)) => summary.modified.push(id),
1952 (None, None) => {}
1953 }
1954 }
1955 summary
1956}
1957
1958#[cfg_attr(not(unix), allow(dead_code))]
1959fn trigger_fingerprint_map(
1960 triggers: &[CollectedManifestTrigger],
1961 include_http_managed: bool,
1962) -> BTreeMap<String, String> {
1963 triggers
1964 .iter()
1965 .filter(|trigger| include_http_managed || !is_http_managed_trigger(trigger))
1966 .map(|trigger| {
1967 let spec = package::manifest_trigger_binding_spec(trigger.clone());
1968 (trigger.config.id.clone(), spec.definition_fingerprint)
1969 })
1970 .collect()
1971}
1972
1973#[cfg_attr(not(unix), allow(dead_code))]
1974fn connector_reload_fingerprint_map(
1975 triggers: &[CollectedManifestTrigger],
1976 provider_overrides: &[ResolvedProviderConnectorConfig],
1977) -> BTreeMap<String, Vec<String>> {
1978 let mut by_provider = BTreeMap::<String, Vec<String>>::new();
1979 for trigger in triggers {
1980 let provider = trigger.config.provider.as_str().to_string();
1981 if !connector_owns_ingress(&provider, provider_overrides)
1982 && matches!(
1983 trigger.config.kind,
1984 crate::package::TriggerKind::Webhook | crate::package::TriggerKind::A2aPush
1985 )
1986 {
1987 continue;
1988 }
1989 let spec = package::manifest_trigger_binding_spec(trigger.clone());
1990 by_provider
1991 .entry(provider)
1992 .or_default()
1993 .push(spec.definition_fingerprint);
1994 }
1995 for override_config in provider_overrides {
1996 by_provider
1997 .entry(override_config.id.as_str().to_string())
1998 .or_default()
1999 .push(provider_connector_fingerprint(override_config));
2000 }
2001 for fingerprints in by_provider.values_mut() {
2002 fingerprints.sort();
2003 }
2004 by_provider
2005}
2006
2007#[cfg_attr(not(unix), allow(dead_code))]
2008fn provider_connector_fingerprint(config: &ResolvedProviderConnectorConfig) -> String {
2009 match &config.connector {
2010 ResolvedProviderConnectorKind::RustBuiltin => format!(
2011 "{}::builtin@{}",
2012 config.id.as_str(),
2013 config.manifest_dir.display()
2014 ),
2015 ResolvedProviderConnectorKind::Harn { module } => format!(
2016 "{}::harn:{}@{}",
2017 config.id.as_str(),
2018 module,
2019 config.manifest_dir.display()
2020 ),
2021 ResolvedProviderConnectorKind::Invalid(message) => format!(
2022 "{}::invalid:{}@{}",
2023 config.id.as_str(),
2024 message,
2025 config.manifest_dir.display()
2026 ),
2027 }
2028}
2029
2030#[cfg_attr(not(unix), allow(dead_code))]
2031fn is_http_managed_trigger(trigger: &CollectedManifestTrigger) -> bool {
2032 matches!(
2033 trigger.config.kind,
2034 crate::package::TriggerKind::Webhook | crate::package::TriggerKind::A2aPush
2035 )
2036}
2037
2038async fn initialize_connectors(
2041 triggers: &[CollectedManifestTrigger],
2042 event_log: Arc<harn_vm::event_log::AnyEventLog>,
2043 secrets: Arc<dyn harn_vm::secrets::SecretProvider>,
2044 metrics: Arc<harn_vm::MetricsRegistry>,
2045 provider_overrides: &[ResolvedProviderConnectorConfig],
2046) -> Result<ConnectorRuntime, OrchestratorError> {
2047 let mut registry = harn_vm::ConnectorRegistry::default();
2048 let mut trigger_registry = harn_vm::TriggerRegistry::default();
2049 let mut grouped_kinds: BTreeMap<harn_vm::ProviderId, BTreeSet<String>> = BTreeMap::new();
2050
2051 for trigger in triggers {
2052 let binding = trigger_binding_for(&trigger.config)?;
2053 grouped_kinds
2054 .entry(binding.provider.clone())
2055 .or_default()
2056 .insert(binding.kind.as_str().to_string());
2057 trigger_registry.register(binding);
2058 }
2059
2060 let ctx = harn_vm::ConnectorCtx {
2061 inbox: Arc::new(
2062 harn_vm::InboxIndex::new(event_log.clone(), metrics.clone())
2063 .await
2064 .map_err(|error| error.to_string())?,
2065 ),
2066 event_log,
2067 secrets,
2068 metrics,
2069 rate_limiter: Arc::new(harn_vm::RateLimiterFactory::default()),
2070 };
2071
2072 let mut providers = Vec::new();
2073 let mut handles = Vec::new();
2074 for (provider, kinds) in grouped_kinds {
2075 let provider_name = provider.as_str().to_string();
2076 if let Some(connector) = connector_override_for(&provider, provider_overrides).await? {
2077 registry.remove(&provider);
2078 registry
2079 .register(connector)
2080 .map_err(|error| error.to_string())?;
2081 }
2082 if registry.get(&provider).is_none() {
2083 if provider_requires_harn_connector(provider.as_str()) {
2084 return Err(format!(
2085 "provider '{}' is package-backed; add [[providers]] id = \"{}\" with \
2086 connector = {{ harn = \"...\" }} to the manifest",
2087 provider.as_str(),
2088 provider.as_str()
2089 )
2090 .into());
2091 }
2092 let connector = connector_for(&provider, kinds);
2093 registry
2094 .register(connector)
2095 .map_err(|error| error.to_string())?;
2096 }
2097 let handle = registry
2098 .get(&provider)
2099 .ok_or_else(|| format!("connector registry lost provider '{}'", provider.as_str()))?;
2100 handle
2101 .lock()
2102 .await
2103 .init(ctx.clone())
2104 .await
2105 .map_err(|error| error.to_string())?;
2106 handles.push(handle.clone());
2107 providers.push(provider_name);
2108 }
2109
2110 Ok(ConnectorRuntime {
2111 registry,
2112 trigger_registry,
2113 handles,
2114 providers,
2115 activations: Vec::new(),
2116 provider_overrides: provider_overrides.to_vec(),
2117 })
2118}
2119
2120fn trigger_binding_for(
2121 config: &ResolvedTriggerConfig,
2122) -> Result<harn_vm::TriggerBinding, OrchestratorError> {
2123 Ok(harn_vm::TriggerBinding {
2124 provider: config.provider.clone(),
2125 kind: harn_vm::TriggerKind::from(trigger_kind_name(config.kind)),
2126 binding_id: config.id.clone(),
2127 dedupe_key: config.dedupe_key.clone(),
2128 dedupe_retention_days: config.retry.retention_days,
2129 config: connector_binding_config(config)?,
2130 })
2131}
2132
2133fn connector_binding_config(
2134 config: &ResolvedTriggerConfig,
2135) -> Result<JsonValue, OrchestratorError> {
2136 match config.kind {
2137 crate::package::TriggerKind::Cron => {
2138 serde_json::to_value(&config.kind_specific).map_err(|error| {
2139 OrchestratorError::Serve({
2140 format!(
2141 "failed to encode cron trigger config '{}': {error}",
2142 config.id
2143 )
2144 })
2145 })
2146 }
2147 crate::package::TriggerKind::Webhook => Ok(serde_json::json!({
2148 "match": config.match_,
2149 "secrets": config.secrets,
2150 "webhook": config.kind_specific,
2151 })),
2152 crate::package::TriggerKind::Poll => Ok(serde_json::json!({
2153 "match": config.match_,
2154 "secrets": config.secrets,
2155 "poll": config.kind_specific,
2156 })),
2157 crate::package::TriggerKind::Stream => Ok(serde_json::json!({
2158 "match": config.match_,
2159 "secrets": config.secrets,
2160 "stream": config.kind_specific,
2161 "window": config.window,
2162 })),
2163 crate::package::TriggerKind::A2aPush => Ok(serde_json::json!({
2164 "match": config.match_,
2165 "secrets": config.secrets,
2166 "a2a_push": a2a_push_connector_config(&config.kind_specific)?,
2167 })),
2168 _ => Ok(JsonValue::Null),
2169 }
2170}
2171
2172fn a2a_push_connector_config(
2173 kind_specific: &BTreeMap<String, toml::Value>,
2174) -> Result<JsonValue, OrchestratorError> {
2175 if let Some(nested) = kind_specific.get("a2a_push") {
2176 return serde_json::to_value(nested).map_err(|error| {
2177 OrchestratorError::Serve(format!("failed to encode a2a_push trigger config: {error}"))
2178 });
2179 }
2180 let filtered = kind_specific
2181 .iter()
2182 .filter(|(key, _)| key.as_str() != "path")
2183 .map(|(key, value)| (key.clone(), value.clone()))
2184 .collect::<BTreeMap<_, _>>();
2185 serde_json::to_value(filtered).map_err(|error| {
2186 OrchestratorError::Serve(format!("failed to encode a2a_push trigger config: {error}"))
2187 })
2188}
2189
2190fn connector_for(
2191 provider: &harn_vm::ProviderId,
2192 kinds: BTreeSet<String>,
2193) -> Box<dyn harn_vm::Connector> {
2194 match provider.as_str() {
2195 "cron" => Box::new(harn_vm::CronConnector::new()),
2196 _ => Box::new(PlaceholderConnector::new(provider.clone(), kinds)),
2197 }
2198}
2199
2200async fn connector_override_for(
2201 provider: &harn_vm::ProviderId,
2202 provider_overrides: &[ResolvedProviderConnectorConfig],
2203) -> Result<Option<Box<dyn harn_vm::Connector>>, OrchestratorError> {
2204 let Some(override_config) = provider_overrides
2205 .iter()
2206 .find(|entry| entry.id == *provider)
2207 else {
2208 return Ok(None);
2209 };
2210 match &override_config.connector {
2211 ResolvedProviderConnectorKind::RustBuiltin => Ok(None),
2212 ResolvedProviderConnectorKind::Invalid(message) => {
2213 Err(OrchestratorError::Serve(message.clone()))
2214 }
2215 ResolvedProviderConnectorKind::Harn { module } => {
2216 let module_path =
2217 harn_vm::resolve_module_import_path(&override_config.manifest_dir, module);
2218 let connector = harn_vm::HarnConnector::load(&module_path)
2219 .await
2220 .map_err(|error| {
2221 format!(
2222 "failed to load Harn connector '{}' for provider '{}': {error}",
2223 module_path.display(),
2224 provider.as_str()
2225 )
2226 })?;
2227 Ok(Some(Box::new(connector)))
2228 }
2229 }
2230}
2231
2232fn build_route_configs(
2233 triggers: &[CollectedManifestTrigger],
2234 binding_versions: &BTreeMap<String, u32>,
2235) -> Result<Vec<RouteConfig>, OrchestratorError> {
2236 let mut seen_paths = BTreeSet::new();
2237 let mut routes = Vec::new();
2238 for trigger in triggers {
2239 let Some(binding_version) = binding_versions.get(&trigger.config.id).copied() else {
2240 return Err(format!(
2241 "trigger registry is missing active manifest binding '{}'",
2242 trigger.config.id
2243 )
2244 .into());
2245 };
2246 if let Some(route) = RouteConfig::from_trigger(trigger, binding_version)? {
2247 if !seen_paths.insert(route.path.clone()) {
2248 return Err(format!(
2249 "trigger route '{}' is configured more than once",
2250 route.path
2251 )
2252 .into());
2253 }
2254 routes.push(route);
2255 }
2256 }
2257 Ok(routes)
2258}
2259
2260fn attach_route_connectors(
2261 routes: Vec<RouteConfig>,
2262 registry: &harn_vm::ConnectorRegistry,
2263 provider_overrides: &[ResolvedProviderConnectorConfig],
2264) -> Result<Vec<RouteConfig>, OrchestratorError> {
2265 routes
2266 .into_iter()
2267 .map(|mut route| {
2268 if route.connector_ingress
2269 || connector_owns_ingress(route.provider.as_str(), provider_overrides)
2270 {
2271 route.connector = Some(registry.get(&route.provider).ok_or_else(|| {
2272 format!(
2273 "connector registry is missing provider '{}'",
2274 route.provider.as_str()
2275 )
2276 })?);
2277 }
2278 Ok(route)
2279 })
2280 .collect()
2281}
2282
2283fn connector_owns_ingress(
2284 provider: &str,
2285 provider_overrides: &[ResolvedProviderConnectorConfig],
2286) -> bool {
2287 provider_overrides.iter().any(|entry| {
2288 entry.id.as_str() == provider
2289 && matches!(entry.connector, ResolvedProviderConnectorKind::Harn { .. })
2290 })
2291}
2292
2293fn provider_requires_harn_connector(provider: &str) -> bool {
2294 harn_vm::provider_metadata(provider).is_some_and(|metadata| {
2295 matches!(
2296 metadata.runtime,
2297 harn_vm::ProviderRuntimeMetadata::Placeholder
2298 )
2299 })
2300}
2301
2302fn live_manifest_binding_versions() -> BTreeMap<String, u32> {
2303 let mut versions = BTreeMap::new();
2304 for binding in harn_vm::snapshot_trigger_bindings() {
2305 if binding.source != harn_vm::TriggerBindingSource::Manifest {
2306 continue;
2307 }
2308 if binding.state == harn_vm::TriggerState::Terminated {
2309 continue;
2310 }
2311 versions
2312 .entry(binding.id)
2313 .and_modify(|current: &mut u32| *current = (*current).max(binding.version))
2314 .or_insert(binding.version);
2315 }
2316 versions
2317}
2318
2319fn trigger_state_snapshots(
2320 triggers: &[CollectedManifestTrigger],
2321 listener_metrics: &BTreeMap<String, TriggerMetricSnapshot>,
2322) -> Vec<TriggerStateSnapshot> {
2323 let bindings_by_id = harn_vm::snapshot_trigger_bindings()
2324 .into_iter()
2325 .filter(|binding| binding.source == harn_vm::TriggerBindingSource::Manifest)
2326 .fold(
2327 BTreeMap::<String, harn_vm::TriggerBindingSnapshot>::new(),
2328 |mut acc, binding| {
2329 match acc.get(&binding.id) {
2330 Some(current) if current.version >= binding.version => {}
2331 _ => {
2332 acc.insert(binding.id.clone(), binding);
2333 }
2334 }
2335 acc
2336 },
2337 );
2338
2339 triggers
2340 .iter()
2341 .map(|trigger| {
2342 let runtime = bindings_by_id.get(&trigger.config.id);
2343 let metrics = listener_metrics.get(&trigger.config.id);
2344 TriggerStateSnapshot {
2345 id: trigger.config.id.clone(),
2346 provider: trigger.config.provider.as_str().to_string(),
2347 kind: trigger_kind_name(trigger.config.kind).to_string(),
2348 handler: handler_kind(&trigger.handler).to_string(),
2349 version: runtime.map(|binding| binding.version),
2350 state: runtime.map(|binding| binding.state.as_str().to_string()),
2351 received: metrics.map(|value| value.received).unwrap_or(0),
2352 dispatched: metrics.map(|value| value.dispatched).unwrap_or(0),
2353 failed: metrics.map(|value| value.failed).unwrap_or(0),
2354 in_flight: metrics.map(|value| value.in_flight).unwrap_or(0),
2355 }
2356 })
2357 .collect()
2358}
2359
2360fn format_trigger_summary(triggers: &[CollectedManifestTrigger]) -> String {
2363 if triggers.is_empty() {
2364 return "none".to_string();
2365 }
2366 triggers
2367 .iter()
2368 .map(|trigger| {
2369 format!(
2370 "{} [{}:{} -> {}]",
2371 trigger.config.id,
2372 trigger.config.provider.as_str(),
2373 trigger_kind_name(trigger.config.kind),
2374 handler_kind(&trigger.handler)
2375 )
2376 })
2377 .collect::<Vec<_>>()
2378 .join(", ")
2379}
2380
2381fn format_activation_summary(activations: &[harn_vm::ActivationHandle]) -> String {
2382 if activations.is_empty() {
2383 return "none".to_string();
2384 }
2385 activations
2386 .iter()
2387 .map(|activation| {
2388 format!(
2389 "{}({})",
2390 activation.provider.as_str(),
2391 activation.binding_count
2392 )
2393 })
2394 .collect::<Vec<_>>()
2395 .join(", ")
2396}
2397
2398fn handler_kind(handler: &CollectedTriggerHandler) -> &'static str {
2399 match handler {
2400 CollectedTriggerHandler::Local { .. } => "local",
2401 CollectedTriggerHandler::A2a { .. } => "a2a",
2402 CollectedTriggerHandler::Worker { .. } => "worker",
2403 CollectedTriggerHandler::Persona { .. } => "persona",
2404 }
2405}
2406
2407fn trigger_kind_name(kind: crate::package::TriggerKind) -> &'static str {
2408 match kind {
2409 crate::package::TriggerKind::Webhook => "webhook",
2410 crate::package::TriggerKind::Cron => "cron",
2411 crate::package::TriggerKind::Poll => "poll",
2412 crate::package::TriggerKind::Stream => "stream",
2413 crate::package::TriggerKind::Predicate => "predicate",
2414 crate::package::TriggerKind::A2aPush => "a2a-push",
2415 }
2416}
2417
2418fn has_orchestrator_api_keys_configured() -> bool {
2419 std::env::var("HARN_ORCHESTRATOR_API_KEYS")
2420 .ok()
2421 .is_some_and(|value| value.split(',').any(|segment| !segment.trim().is_empty()))
2422}
2423
2424fn has_mcp_oauth_configured() -> bool {
2425 std::env::var("HARN_MCP_OAUTH_AUTHORIZATION_SERVERS")
2426 .ok()
2427 .is_some_and(|value| value.split(',').any(|segment| !segment.trim().is_empty()))
2428}
2429
2430fn validate_mcp_paths(
2431 path: &str,
2432 sse_path: &str,
2433 messages_path: &str,
2434) -> Result<(), OrchestratorError> {
2435 let reserved = [
2436 "/health",
2437 "/healthz",
2438 "/readyz",
2439 "/metrics",
2440 "/admin/reload",
2441 "/acp",
2442 ];
2443 let mut seen = BTreeSet::new();
2444 for (label, value) in [
2445 ("--mcp-path", path),
2446 ("--mcp-sse-path", sse_path),
2447 ("--mcp-messages-path", messages_path),
2448 ] {
2449 if !value.starts_with('/') {
2450 return Err(format!("{label} must start with '/'").into());
2451 }
2452 if value == "/" {
2453 return Err(format!("{label} cannot be '/'").into());
2454 }
2455 if reserved.contains(&value) {
2456 return Err(format!("{label} cannot use reserved listener path '{value}'").into());
2457 }
2458 if !seen.insert(value) {
2459 return Err(format!("embedded MCP paths must be unique; duplicate '{value}'").into());
2460 }
2461 }
2462 Ok(())
2463}
2464
2465async fn append_lifecycle_event(
2466 log: &Arc<AnyEventLog>,
2467 kind: &str,
2468 payload: JsonValue,
2469) -> Result<(), OrchestratorError> {
2470 let topic =
2471 harn_vm::event_log::Topic::new(LIFECYCLE_TOPIC).map_err(|error| error.to_string())?;
2472 log.append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
2473 .await
2474 .map(|_| ())
2475 .map_err(|error| {
2476 OrchestratorError::Serve(format!(
2477 "failed to append orchestrator lifecycle event: {error}"
2478 ))
2479 })
2480}
2481
2482async fn append_pump_lifecycle_event(
2483 log: &Arc<AnyEventLog>,
2484 kind: &str,
2485 payload: JsonValue,
2486) -> Result<(), OrchestratorError> {
2487 append_lifecycle_event(log, kind, payload).await
2488}
2489
2490async fn wait_for_pump_drain_release(
2491 log: &Arc<AnyEventLog>,
2492 topic: &harn_vm::event_log::Topic,
2493 event_id: u64,
2494 gate_rx: &mut watch::Receiver<bool>,
2495) -> Result<(), OrchestratorError> {
2496 if !*gate_rx.borrow() {
2497 return Ok(());
2498 }
2499 append_pump_lifecycle_event(
2500 log,
2501 "pump_drain_waiting",
2502 json!({
2503 "topic": topic.as_str(),
2504 "event_log_id": event_id,
2505 }),
2506 )
2507 .await?;
2508 while *gate_rx.borrow() {
2509 if gate_rx.changed().await.is_err() {
2510 break;
2511 }
2512 }
2513 Ok(())
2514}
2515
2516#[cfg_attr(not(unix), allow(dead_code))]
2517async fn append_manifest_event(
2518 log: &Arc<AnyEventLog>,
2519 kind: &str,
2520 payload: JsonValue,
2521) -> Result<(), OrchestratorError> {
2522 let topic =
2523 harn_vm::event_log::Topic::new(MANIFEST_TOPIC).map_err(|error| error.to_string())?;
2524 log.append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
2525 .await
2526 .map(|_| ())
2527 .map_err(|error| {
2528 OrchestratorError::Serve(format!(
2529 "failed to append orchestrator manifest event: {error}"
2530 ))
2531 })
2532}
2533
2534async fn record_pump_metrics(
2535 metrics: &harn_vm::MetricsRegistry,
2536 log: &Arc<AnyEventLog>,
2537 topic: &harn_vm::event_log::Topic,
2538 last_seen: u64,
2539 outstanding: usize,
2540) -> Result<(), OrchestratorError> {
2541 let latest = log.latest(topic).await.ok().flatten().unwrap_or(last_seen);
2542 let backlog = latest.saturating_sub(last_seen);
2543 metrics.set_orchestrator_pump_outstanding(topic.as_str(), outstanding);
2544 metrics.set_orchestrator_pump_backlog(topic.as_str(), backlog);
2545 append_pump_lifecycle_event(
2546 log,
2547 "pump_metrics_recorded",
2548 json!({
2549 "topic": topic.as_str(),
2550 "latest": latest,
2551 "last_seen": last_seen,
2552 "backlog": backlog,
2553 "outstanding": outstanding,
2554 }),
2555 )
2556 .await
2557}
2558
2559fn admission_delay(occurred_at_ms: i64) -> Duration {
2560 let now = std::time::SystemTime::now()
2561 .duration_since(std::time::UNIX_EPOCH)
2562 .unwrap_or_default()
2563 .as_millis() as i64;
2564 Duration::from_millis(now.saturating_sub(occurred_at_ms).max(0) as u64)
2565}
2566
2567async fn emit_drain_truncated(
2568 log: &Arc<AnyEventLog>,
2569 topic_name: &str,
2570 report: PumpDrainReport,
2571 config: DrainConfig,
2572) -> Result<(), OrchestratorError> {
2573 if !report.truncated() {
2574 return Ok(());
2575 }
2576 eprintln!(
2577 "[harn] warning: pump drain truncated for {topic_name}: remaining_queued={} drain_items={} reason={}",
2578 report.remaining_queued,
2579 report.drain_items,
2580 report.stop_reason.as_str()
2581 );
2582 append_lifecycle_event(
2583 log,
2584 "drain_truncated",
2585 json!({
2586 "topic": topic_name,
2587 "remaining_queued": report.remaining_queued,
2588 "drain_items": report.drain_items,
2589 "outstanding_tasks": report.outstanding_tasks,
2590 "max_items": config.max_items,
2591 "deadline_secs": config.deadline.as_secs(),
2592 "reason": report.stop_reason.as_str(),
2593 }),
2594 )
2595 .await
2596}
2597
2598async fn topic_latest_id(
2599 log: &Arc<AnyEventLog>,
2600 topic_name: &str,
2601) -> Result<u64, OrchestratorError> {
2602 let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
2603 log.latest(&topic)
2604 .await
2605 .map(|value| value.unwrap_or(0))
2606 .map_err(|error| {
2607 OrchestratorError::Serve(format!(
2608 "failed to read topic head for {topic_name}: {error}"
2609 ))
2610 })
2611}
2612
2613async fn drain_pump_best_effort(
2614 log: &Arc<AnyEventLog>,
2615 topic_name: &str,
2616 pump: PumpHandle,
2617 config: DrainConfig,
2618 overall_deadline: tokio::time::Instant,
2619) -> Result<PumpDrainReport, OrchestratorError> {
2620 let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
2621 let consumer = pump_consumer_id(&topic)?;
2622 let start_seen = log
2623 .consumer_cursor(&topic, &consumer)
2624 .await
2625 .map_err(|error| format!("failed to read consumer cursor for {topic_name}: {error}"))?
2626 .unwrap_or(0);
2627 let up_to = log
2628 .latest(&topic)
2629 .await
2630 .map_err(|error| format!("failed to read topic head for {topic_name}: {error}"))?
2631 .unwrap_or(0);
2632 let budget = remaining_budget(overall_deadline);
2633
2634 match tokio::time::timeout(
2635 budget,
2636 pump.drain(log, topic_name, up_to, config, overall_deadline),
2637 )
2638 .await
2639 {
2640 Ok(Ok(report)) => Ok(report),
2641 Ok(Err(error)) => {
2642 eprintln!("[harn] warning: pump drain error for {topic_name}: {error}");
2643 best_effort_pump_report(
2644 log,
2645 &topic,
2646 &consumer,
2647 start_seen,
2648 up_to,
2649 PumpDrainStopReason::Error,
2650 )
2651 .await
2652 }
2653 Err(_) => {
2654 eprintln!(
2655 "[harn] warning: pump drain timed out for {topic_name} after {:?}",
2656 budget
2657 );
2658 best_effort_pump_report(
2659 log,
2660 &topic,
2661 &consumer,
2662 start_seen,
2663 up_to,
2664 PumpDrainStopReason::Deadline,
2665 )
2666 .await
2667 }
2668 }
2669}
2670
2671async fn best_effort_pump_report(
2672 log: &Arc<AnyEventLog>,
2673 topic: &harn_vm::event_log::Topic,
2674 consumer: &ConsumerId,
2675 start_seen: u64,
2676 up_to: u64,
2677 stop_reason: PumpDrainStopReason,
2678) -> Result<PumpDrainReport, OrchestratorError> {
2679 let last_seen = log
2680 .consumer_cursor(topic, consumer)
2681 .await
2682 .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
2683 .unwrap_or(start_seen);
2684 let stats = PumpStats {
2685 last_seen,
2686 processed: last_seen.saturating_sub(start_seen),
2687 };
2688 Ok(pump_drain_report(stats, start_seen, up_to, 0, stop_reason))
2689}
2690
2691fn remaining_budget(deadline: tokio::time::Instant) -> Duration {
2692 deadline.saturating_duration_since(tokio::time::Instant::now())
2693}
2694
2695fn maybe_finish_pump_drain(
2696 stats: PumpStats,
2697 progress: PumpDrainProgress,
2698 outstanding_tasks: usize,
2699) -> Option<PumpDrainReport> {
2700 if stats.last_seen >= progress.request.up_to && outstanding_tasks == 0 {
2701 return Some(pump_drain_report(
2702 stats,
2703 progress.start_seen,
2704 progress.request.up_to,
2705 outstanding_tasks,
2706 PumpDrainStopReason::Drained,
2707 ));
2708 }
2709 if outstanding_tasks > 0 {
2710 if tokio::time::Instant::now() >= progress.request.deadline {
2711 return Some(pump_drain_report(
2712 stats,
2713 progress.start_seen,
2714 progress.request.up_to,
2715 outstanding_tasks,
2716 PumpDrainStopReason::Deadline,
2717 ));
2718 }
2719 return None;
2720 }
2721 let drain_items = stats.last_seen.saturating_sub(progress.start_seen);
2722 if drain_items >= progress.request.config.max_items as u64 {
2723 return Some(pump_drain_report(
2724 stats,
2725 progress.start_seen,
2726 progress.request.up_to,
2727 outstanding_tasks,
2728 PumpDrainStopReason::MaxItems,
2729 ));
2730 }
2731 if tokio::time::Instant::now() >= progress.request.deadline {
2732 return Some(pump_drain_report(
2733 stats,
2734 progress.start_seen,
2735 progress.request.up_to,
2736 outstanding_tasks,
2737 PumpDrainStopReason::Deadline,
2738 ));
2739 }
2740 None
2741}
2742
2743fn pump_drain_report(
2744 stats: PumpStats,
2745 start_seen: u64,
2746 up_to: u64,
2747 outstanding_tasks: usize,
2748 stop_reason: PumpDrainStopReason,
2749) -> PumpDrainReport {
2750 PumpDrainReport {
2751 stats,
2752 drain_items: stats.last_seen.saturating_sub(start_seen),
2753 remaining_queued: up_to.saturating_sub(stats.last_seen),
2754 outstanding_tasks,
2755 stop_reason,
2756 }
2757}
2758
2759fn pump_consumer_id(topic: &harn_vm::event_log::Topic) -> Result<ConsumerId, OrchestratorError> {
2760 ConsumerId::new(format!("orchestrator-pump.{}", topic.as_str())).map_err(|error| {
2761 OrchestratorError::Serve(format!("failed to create consumer id for {topic}: {error}"))
2762 })
2763}
2764
2765fn inbox_task_test_release_file() -> Option<PathBuf> {
2766 test_file_from_env(TEST_INBOX_TASK_RELEASE_FILE_ENV)
2767}
2768
2769fn test_file_from_env(key: &str) -> Option<PathBuf> {
2770 std::env::var_os(key)
2771 .filter(|value| !value.is_empty())
2772 .map(PathBuf::from)
2773}
2774
2775async fn wait_for_test_release_file(path: &Path) {
2776 while tokio::fs::metadata(path).await.is_err() {
2777 tokio::time::sleep(Duration::from_millis(10)).await;
2778 }
2779}
2780
2781fn pending_pump_test_should_fail() -> bool {
2782 std::env::var(TEST_FAIL_PENDING_PUMP_ENV)
2783 .ok()
2784 .is_some_and(|value| value != "0")
2785}
2786
2787fn spawn_manifest_watcher(
2788 config_path: PathBuf,
2789 reload: AdminReloadHandle,
2790) -> Result<notify::RecommendedWatcher, OrchestratorError> {
2791 use notify::{Event, EventKind, RecursiveMode, Watcher};
2792
2793 let watch_dir = config_path.parent().ok_or_else(|| {
2794 format!(
2795 "manifest has no parent directory: {}",
2796 config_path.display()
2797 )
2798 })?;
2799 let target_name = config_path
2800 .file_name()
2801 .and_then(|name| name.to_str())
2802 .ok_or_else(|| {
2803 format!(
2804 "manifest path is not valid UTF-8: {}",
2805 config_path.display()
2806 )
2807 })?
2808 .to_string();
2809 let (tx, mut rx) = mpsc::unbounded_channel::<()>();
2810 tokio::task::spawn_local(async move {
2811 while rx.recv().await.is_some() {
2812 tokio::time::sleep(Duration::from_millis(200)).await;
2813 while rx.try_recv().is_ok() {}
2814 let _ = reload.trigger("file_watch");
2815 }
2816 });
2817 let mut watcher =
2818 notify::recommended_watcher(move |res: Result<Event, notify::Error>| match res {
2819 Ok(event)
2820 if matches!(
2821 event.kind,
2822 EventKind::Modify(_)
2823 | EventKind::Create(_)
2824 | EventKind::Remove(_)
2825 | EventKind::Any
2826 ) && event.paths.iter().any(|path| {
2827 path.file_name()
2828 .and_then(|name| name.to_str())
2829 .is_some_and(|name| name == target_name)
2830 }) =>
2831 {
2832 let _ = tx.send(());
2833 }
2834 _ => {}
2835 })
2836 .map_err(|error| format!("failed to create manifest watcher: {error}"))?;
2837 watcher
2838 .watch(watch_dir, RecursiveMode::NonRecursive)
2839 .map_err(|error| {
2840 format!(
2841 "failed to watch manifest directory {}: {error}",
2842 watch_dir.display()
2843 )
2844 })?;
2845 Ok(watcher)
2846}
2847
2848pub(crate) fn load_manifest(config_path: &Path) -> Result<(Manifest, PathBuf), OrchestratorError> {
2849 if !config_path.is_file() {
2850 return Err(format!("manifest not found: {}", config_path.display()).into());
2851 }
2852 let content = std::fs::read_to_string(config_path)
2853 .map_err(|error| format!("failed to read {}: {error}", config_path.display()))?;
2854 let manifest = toml::from_str::<Manifest>(&content)
2855 .map_err(|error| format!("failed to parse {}: {error}", config_path.display()))?;
2856 let manifest_dir = config_path.parent().map(Path::to_path_buf).ok_or_else(|| {
2857 format!(
2858 "manifest has no parent directory: {}",
2859 config_path.display()
2860 )
2861 })?;
2862 Ok((manifest, manifest_dir))
2863}
2864
2865pub(crate) fn absolutize_from_cwd(path: &Path) -> Result<PathBuf, OrchestratorError> {
2866 let candidate = if path.is_absolute() {
2867 path.to_path_buf()
2868 } else {
2869 std::env::current_dir()
2870 .map_err(|error| format!("failed to read current directory: {error}"))?
2871 .join(path)
2872 };
2873 Ok(candidate)
2874}
2875
2876fn configured_secret_chain_display() -> String {
2877 std::env::var(harn_vm::secrets::SECRET_PROVIDER_CHAIN_ENV)
2878 .unwrap_or_else(|_| harn_vm::secrets::DEFAULT_SECRET_PROVIDER_CHAIN.to_string())
2879 .split(',')
2880 .map(str::trim)
2881 .filter(|segment| !segment.is_empty())
2882 .collect::<Vec<_>>()
2883 .join(" -> ")
2884}
2885
2886fn secret_namespace_for(manifest_dir: &Path) -> String {
2887 match std::env::var("HARN_SECRET_NAMESPACE") {
2888 Ok(namespace) if !namespace.trim().is_empty() => namespace,
2889 _ => {
2890 let leaf = manifest_dir
2891 .file_name()
2892 .and_then(|name| name.to_str())
2893 .filter(|name| !name.is_empty())
2894 .unwrap_or("workspace");
2895 format!("harn/{leaf}")
2896 }
2897 }
2898}
2899
2900fn now_rfc3339() -> Result<String, OrchestratorError> {
2901 OffsetDateTime::now_utc()
2902 .format(&Rfc3339)
2903 .map_err(|error| OrchestratorError::Serve(format!("failed to format timestamp: {error}")))
2904}
2905
2906fn write_state_snapshot(
2907 path: &Path,
2908 snapshot: &ServeStateSnapshot,
2909) -> Result<(), OrchestratorError> {
2910 let encoded = serde_json::to_vec_pretty(snapshot)
2911 .map_err(|error| format!("failed to encode orchestrator state snapshot: {error}"))?;
2912 if let Some(parent) = path.parent() {
2913 std::fs::create_dir_all(parent)
2914 .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
2915 }
2916 std::fs::write(path, encoded).map_err(|error| {
2917 OrchestratorError::Serve(format!("failed to write {}: {error}", path.display()))
2918 })
2919}
2920
2921#[derive(Debug, Serialize)]
2924struct ServeStateSnapshot {
2925 status: String,
2926 role: String,
2927 bind: String,
2928 listener_url: String,
2929 manifest_path: String,
2930 state_dir: String,
2931 started_at: String,
2932 stopped_at: Option<String>,
2933 secret_provider_chain: String,
2934 event_log_backend: String,
2935 event_log_location: Option<String>,
2936 triggers: Vec<TriggerStateSnapshot>,
2937 connectors: Vec<String>,
2938 activations: Vec<ConnectorActivationSnapshot>,
2939}
2940
2941#[derive(Debug, Serialize)]
2942struct TriggerStateSnapshot {
2943 id: String,
2944 provider: String,
2945 kind: String,
2946 handler: String,
2947 version: Option<u32>,
2948 state: Option<String>,
2949 received: u64,
2950 dispatched: u64,
2951 failed: u64,
2952 in_flight: u64,
2953}
2954
2955#[derive(Debug, Serialize)]
2956struct ConnectorActivationSnapshot {
2957 provider: String,
2958 binding_count: usize,
2959}
2960
2961struct PlaceholderConnector {
2964 provider_id: harn_vm::ProviderId,
2965 kinds: Vec<harn_vm::TriggerKind>,
2966 _ctx: Option<harn_vm::ConnectorCtx>,
2967}
2968
2969impl PlaceholderConnector {
2970 fn new(provider_id: harn_vm::ProviderId, kinds: BTreeSet<String>) -> Self {
2971 Self {
2972 provider_id,
2973 kinds: kinds.into_iter().map(harn_vm::TriggerKind::from).collect(),
2974 _ctx: None,
2975 }
2976 }
2977}
2978
2979struct PlaceholderClient;
2980
2981#[async_trait]
2982impl harn_vm::ConnectorClient for PlaceholderClient {
2983 async fn call(
2984 &self,
2985 method: &str,
2986 _args: JsonValue,
2987 ) -> Result<JsonValue, harn_vm::ClientError> {
2988 Err(harn_vm::ClientError::Other(format!(
2989 "connector client method '{method}' is not implemented in the orchestrator scaffold"
2990 )))
2991 }
2992}
2993
2994#[async_trait]
2995impl harn_vm::Connector for PlaceholderConnector {
2996 fn provider_id(&self) -> &harn_vm::ProviderId {
2997 &self.provider_id
2998 }
2999
3000 fn kinds(&self) -> &[harn_vm::TriggerKind] {
3001 &self.kinds
3002 }
3003
3004 async fn init(&mut self, ctx: harn_vm::ConnectorCtx) -> Result<(), harn_vm::ConnectorError> {
3005 self._ctx = Some(ctx);
3006 Ok(())
3007 }
3008
3009 async fn activate(
3010 &self,
3011 bindings: &[harn_vm::TriggerBinding],
3012 ) -> Result<harn_vm::ActivationHandle, harn_vm::ConnectorError> {
3013 Ok(harn_vm::ActivationHandle::new(
3014 self.provider_id.clone(),
3015 bindings.len(),
3016 ))
3017 }
3018
3019 async fn normalize_inbound(
3020 &self,
3021 _raw: harn_vm::RawInbound,
3022 ) -> Result<harn_vm::TriggerEvent, harn_vm::ConnectorError> {
3023 Err(harn_vm::ConnectorError::Unsupported(format!(
3024 "connector '{}' inbound normalization is not implemented yet",
3025 self.provider_id.as_str()
3026 )))
3027 }
3028
3029 fn payload_schema(&self) -> harn_vm::ProviderPayloadSchema {
3030 harn_vm::ProviderPayloadSchema::named("TriggerEvent")
3031 }
3032
3033 fn client(&self) -> Arc<dyn harn_vm::ConnectorClient> {
3034 Arc::new(PlaceholderClient)
3035 }
3036}
3037
3038#[cfg(test)]
3039mod tests {
3040 use super::*;
3041 use futures::StreamExt;
3042 use harn_vm::event_log::{EventLog, Topic};
3043
3044 fn write_test_file(dir: &Path, relative: &str, contents: &str) {
3045 let path = dir.join(relative);
3046 if let Some(parent) = path.parent() {
3047 std::fs::create_dir_all(parent).unwrap();
3048 }
3049 std::fs::write(path, contents).unwrap();
3050 }
3051
3052 fn stream_manifest_fixture() -> &'static str {
3053 r#"
3054[package]
3055name = "fixture"
3056
3057[exports]
3058handlers = "lib.harn"
3059
3060[[triggers]]
3061id = "ws-stream"
3062kind = "stream"
3063provider = "websocket"
3064path = "/streams/ws"
3065match = { events = ["quote.tick"] }
3066handler = "handlers::on_stream"
3067"#
3068 }
3069
3070 fn stream_handler_fixture(marker_path: &Path) -> String {
3071 format!(
3072 r#"
3073import "std/triggers"
3074
3075pub fn on_stream(event: TriggerEvent) {{
3076 write_file({marker:?}, json_stringify({{
3077 provider: event.provider,
3078 kind: event.kind,
3079 key: event.provider_payload.key,
3080 stream: event.provider_payload.stream,
3081 amount: event.provider_payload.raw.value.amount,
3082 }}))
3083}}
3084"#,
3085 marker = marker_path.display().to_string()
3086 )
3087 }
3088
3089 #[tokio::test(flavor = "multi_thread")]
3093 async fn stream_trigger_route_uses_generic_stream_connector_in_process() {
3094 let _env_lock = crate::tests::common::env_lock::lock_env().lock().await;
3097 let _secret_providers = crate::env_guard::ScopedEnvVar::set("HARN_SECRET_PROVIDERS", "env");
3098
3099 let temp = tempfile::TempDir::new().unwrap();
3100 let marker_path = temp.path().join("stream-handler.json");
3101 write_test_file(temp.path(), "harn.toml", stream_manifest_fixture());
3102 write_test_file(
3103 temp.path(),
3104 "lib.harn",
3105 &stream_handler_fixture(&marker_path),
3106 );
3107
3108 let config =
3109 OrchestratorConfig::for_test(temp.path().join("harn.toml"), temp.path().join("state"));
3110 let harness = OrchestratorHarness::start(config)
3111 .await
3112 .expect("harness start");
3113 let base_url = harness.listener_url().to_string();
3114 let event_log = harness.event_log();
3115
3116 let response = reqwest::Client::new()
3117 .post(format!("{base_url}/streams/ws"))
3118 .header("content-type", "application/json")
3119 .json(&serde_json::json!({
3120 "key": "acct-1",
3121 "stream": "quotes",
3122 "value": {"amount": 10}
3123 }))
3124 .send()
3125 .await
3126 .unwrap();
3127 assert_eq!(response.status(), 200);
3128
3129 let topic = Topic::new("orchestrator.lifecycle").unwrap();
3132 let mut stream = event_log.clone().subscribe(&topic, None).await.unwrap();
3133 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30);
3134 loop {
3135 let remaining = deadline
3136 .checked_duration_since(tokio::time::Instant::now())
3137 .expect("timed out waiting for pump_dispatch_completed");
3138 let (_, event) = tokio::time::timeout(remaining, stream.next())
3139 .await
3140 .expect("timed out waiting for pump_dispatch_completed event")
3141 .expect("event stream ended unexpectedly")
3142 .expect("event stream error");
3143 if event.kind == "pump_dispatch_completed"
3144 && event.payload["status"] == serde_json::json!("completed")
3145 {
3146 break;
3147 }
3148 }
3149 drop(stream);
3150
3151 let marker: serde_json::Value =
3152 serde_json::from_str(&std::fs::read_to_string(&marker_path).unwrap()).unwrap();
3153 assert_eq!(
3154 marker.get("provider").and_then(|v| v.as_str()),
3155 Some("websocket")
3156 );
3157 assert_eq!(
3158 marker.get("kind").and_then(|v| v.as_str()),
3159 Some("quote.tick")
3160 );
3161 assert_eq!(marker.get("key").and_then(|v| v.as_str()), Some("acct-1"));
3162 assert_eq!(
3163 marker.get("stream").and_then(|v| v.as_str()),
3164 Some("quotes")
3165 );
3166 assert_eq!(marker.get("amount").and_then(|v| v.as_i64()), Some(10));
3167
3168 harness
3169 .shutdown(std::time::Duration::from_secs(5))
3170 .await
3171 .expect("harness shutdown");
3172 }
3173}