1use std::collections::{BTreeMap, BTreeSet};
2use std::net::SocketAddr;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use serde::{Deserialize, Serialize};
10use serde_json::{json, Value as JsonValue};
11use time::format_description::well_known::Rfc3339;
12use time::OffsetDateTime;
13use tokio::sync::{mpsc, oneshot, watch};
14use tokio::task::JoinSet;
15
16use harn_vm::event_log::{AnyEventLog, ConsumerId, EventLog};
17
18use super::common::stranded_envelopes;
19use super::errors::OrchestratorError;
20use super::listener::{
21 AdminReloadHandle, AdminReloadRequest, ListenerConfig, ListenerRuntime, RouteConfig,
22 TriggerMetricSnapshot,
23};
24use super::origin_guard::OriginAllowList;
25use super::role::OrchestratorRole;
26use super::tls::TlsFiles;
27use crate::package::{
28 self, CollectedManifestTrigger, CollectedTriggerHandler, Manifest,
29 ResolvedProviderConnectorConfig, ResolvedProviderConnectorKind, ResolvedTriggerConfig,
30};
31
32const LIFECYCLE_TOPIC: &str = "orchestrator.lifecycle";
33#[cfg_attr(not(unix), allow(dead_code))]
34const MANIFEST_TOPIC: &str = "orchestrator.manifest";
35const STATE_SNAPSHOT_FILE: &str = "orchestrator-state.json";
36const PENDING_TOPIC: &str = "orchestrator.triggers.pending";
37const CRON_TICK_TOPIC: &str = "connectors.cron.tick";
38const TEST_INBOX_TASK_RELEASE_FILE_ENV: &str = "HARN_TEST_ORCHESTRATOR_INBOX_TASK_RELEASE_FILE";
39const TEST_FAIL_PENDING_PUMP_ENV: &str = "HARN_TEST_ORCHESTRATOR_FAIL_PENDING_PUMP";
40const WAITPOINT_SERVICE_INTERVAL: Duration = Duration::from_millis(250);
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq)]
46pub struct DrainConfig {
47 pub max_items: usize,
48 pub deadline: Duration,
49}
50
51impl Default for DrainConfig {
52 fn default() -> Self {
53 Self {
54 max_items: crate::package::default_orchestrator_drain_max_items(),
55 deadline: Duration::from_secs(
56 crate::package::default_orchestrator_drain_deadline_seconds(),
57 ),
58 }
59 }
60}
61
62#[derive(Clone, Copy, Debug, PartialEq, Eq)]
64pub struct PumpConfig {
65 pub max_outstanding: usize,
66}
67
68impl Default for PumpConfig {
69 fn default() -> Self {
70 Self {
71 max_outstanding: crate::package::default_orchestrator_pump_max_outstanding(),
72 }
73 }
74}
75
76#[derive(Clone, Debug)]
78pub struct OrchestratorConfig {
79 pub manifest_path: PathBuf,
80 pub state_dir: PathBuf,
81 pub bind: SocketAddr,
82 pub role: OrchestratorRole,
83 pub watch_manifest: bool,
84 pub mcp: bool,
85 pub mcp_path: String,
86 pub mcp_sse_path: String,
87 pub mcp_messages_path: String,
88 pub tls: Option<TlsFiles>,
89 pub shutdown_timeout: Duration,
90 pub drain: DrainConfig,
91 pub pump: PumpConfig,
92 pub log_format: Option<harn_vm::observability::otel::LogFormat>,
95}
96
97impl OrchestratorConfig {
98 pub fn for_test(manifest_path: PathBuf, state_dir: PathBuf) -> Self {
100 Self {
101 manifest_path,
102 state_dir,
103 bind: "127.0.0.1:0".parse().unwrap(),
104 role: OrchestratorRole::SingleTenant,
105 watch_manifest: false,
106 mcp: false,
107 mcp_path: "/mcp".to_string(),
108 mcp_sse_path: "/sse".to_string(),
109 mcp_messages_path: "/messages".to_string(),
110 tls: None,
111 shutdown_timeout: Duration::from_secs(5),
112 drain: DrainConfig::default(),
113 pump: PumpConfig::default(),
114 log_format: None,
115 }
116 }
117}
118
119#[derive(Debug)]
123pub struct ShutdownReport {
124 #[allow(dead_code)]
125 pub timed_out: bool,
126}
127
128#[derive(Debug)]
130pub struct HarnessError(pub String);
131
132impl std::fmt::Display for HarnessError {
133 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134 f.write_str(&self.0)
135 }
136}
137
138impl From<OrchestratorError> for HarnessError {
139 fn from(error: OrchestratorError) -> Self {
140 HarnessError(error.to_string())
141 }
142}
143
144impl From<String> for HarnessError {
145 fn from(s: String) -> Self {
146 HarnessError(s)
147 }
148}
149
150#[allow(dead_code)]
157pub struct OrchestratorHarness {
158 event_log: Arc<AnyEventLog>,
159 listener_url: String,
160 local_addr: SocketAddr,
161 state_dir: PathBuf,
162 admin_reload: AdminReloadHandle,
163 shutdown_tx: Arc<watch::Sender<bool>>,
164 pump_drain_gate: PumpDrainGate,
165 join: Option<std::thread::JoinHandle<()>>,
166}
167
168struct ReadyState {
169 event_log: Arc<AnyEventLog>,
170 listener_url: String,
171 local_addr: SocketAddr,
172 state_dir: PathBuf,
173 admin_reload: AdminReloadHandle,
174}
175
176#[derive(Clone)]
177struct PumpDrainGate {
178 hold_tx: watch::Sender<bool>,
179}
180
181impl PumpDrainGate {
182 fn new() -> Self {
183 let (hold_tx, _) = watch::channel(false);
184 Self { hold_tx }
185 }
186
187 fn pause(&self) {
188 let _ = self.hold_tx.send(true);
189 }
190
191 fn release(&self) {
192 let _ = self.hold_tx.send(false);
193 }
194
195 fn subscribe(&self) -> watch::Receiver<bool> {
196 self.hold_tx.subscribe()
197 }
198}
199
200#[allow(dead_code)]
201impl OrchestratorHarness {
202 pub async fn start(config: OrchestratorConfig) -> Result<Self, HarnessError> {
205 let (ready_tx, ready_rx) = oneshot::channel::<Result<ReadyState, OrchestratorError>>();
206 let (shutdown_tx, shutdown_rx) = watch::channel(false);
207 let shutdown_tx = Arc::new(shutdown_tx);
208 let pump_drain_gate = PumpDrainGate::new();
209 let task_pump_drain_gate = pump_drain_gate.clone();
210
211 let join = std::thread::spawn(move || {
212 let rt = tokio::runtime::Builder::new_multi_thread()
219 .worker_threads(2)
220 .enable_all()
221 .build()
222 .expect("failed to build OrchestratorHarness tokio runtime");
223 let local = tokio::task::LocalSet::new();
224 rt.block_on(local.run_until(orchestrator_task(
225 config,
226 ready_tx,
227 shutdown_rx,
228 task_pump_drain_gate,
229 )));
230 });
231
232 match ready_rx.await {
233 Ok(Ok(ready)) => Ok(Self {
234 event_log: ready.event_log,
235 listener_url: ready.listener_url,
236 local_addr: ready.local_addr,
237 state_dir: ready.state_dir,
238 admin_reload: ready.admin_reload,
239 shutdown_tx,
240 pump_drain_gate,
241 join: Some(join),
242 }),
243 Ok(Err(error)) => {
244 let _ = join.join();
245 Err(HarnessError::from(error))
246 }
247 Err(_) => {
248 let _ = join.join();
249 Err(HarnessError(
250 "harness thread exited before signaling readiness".to_string(),
251 ))
252 }
253 }
254 }
255
256 pub fn listener_url(&self) -> &str {
257 &self.listener_url
258 }
259
260 pub fn local_addr(&self) -> SocketAddr {
261 self.local_addr
262 }
263
264 pub fn event_log(&self) -> Arc<AnyEventLog> {
265 self.event_log.clone()
266 }
267
268 pub fn state_dir(&self) -> &Path {
269 &self.state_dir
270 }
271
272 pub fn admin_reload(&self) -> AdminReloadHandle {
275 self.admin_reload.clone()
276 }
277
278 pub fn shutdown_trigger(&self) -> Arc<watch::Sender<bool>> {
281 self.shutdown_tx.clone()
282 }
283
284 pub fn pause_pump_drain(&self) {
287 self.pump_drain_gate.pause();
288 }
289
290 pub fn release_pump_drain(&self) {
292 self.pump_drain_gate.release();
293 }
294
295 pub async fn shutdown(mut self, _deadline: Duration) -> Result<ShutdownReport, HarnessError> {
297 let _ = self.shutdown_tx.send(true);
299 let join = self.join.take().expect("join handle");
301 tokio::task::spawn_blocking(move || join.join())
302 .await
303 .map_err(|_| HarnessError("spawn_blocking join failed".to_string()))?
304 .map_err(|_| HarnessError("harness background thread panicked".to_string()))?;
305 Ok(ShutdownReport { timed_out: false })
306 }
307}
308
309impl Drop for OrchestratorHarness {
310 fn drop(&mut self) {
311 let _ = self.shutdown_tx.send(true);
312 if let Some(join) = self.join.take() {
313 let _ = join.join();
314 }
315 }
316}
317
318async fn orchestrator_task(
321 config: OrchestratorConfig,
322 ready_tx: oneshot::Sender<Result<ReadyState, OrchestratorError>>,
323 shutdown_rx: watch::Receiver<bool>,
324 pump_drain_gate: PumpDrainGate,
325) {
326 if let Err(error) = orchestrator_lifecycle(config, ready_tx, shutdown_rx, pump_drain_gate).await
327 {
328 eprintln!("[harn] orchestrator harness error: {error}");
329 }
330}
331
332async fn orchestrator_lifecycle(
333 config: OrchestratorConfig,
334 ready_tx: oneshot::Sender<Result<ReadyState, OrchestratorError>>,
335 mut shutdown_rx: watch::Receiver<bool>,
336 pump_drain_gate: PumpDrainGate,
337) -> Result<(), OrchestratorError> {
338 harn_vm::reset_thread_local_state();
339
340 let shutdown_timeout = config.shutdown_timeout;
341 let drain_config = config.drain;
342 let pump_config = PumpConfig {
343 max_outstanding: config.pump.max_outstanding.max(1),
344 };
345
346 let state_dir = config.state_dir.clone();
347 std::fs::create_dir_all(&state_dir).map_err(|error| {
348 format!(
349 "failed to create state dir {}: {error}",
350 state_dir.display()
351 )
352 })?;
353
354 let observability = if let Some(log_format) = config.log_format {
355 Some(
356 harn_vm::observability::otel::ObservabilityGuard::install_orchestrator_subscriber(
357 harn_vm::observability::otel::OrchestratorObservabilityConfig {
358 log_format,
359 state_dir: Some(state_dir.clone()),
360 },
361 )?,
362 )
363 } else {
364 None
365 };
366
367 let config_path = absolutize_from_cwd(&config.manifest_path)?;
368 let (manifest, manifest_dir) = load_manifest(&config_path)?;
369 let drain_config = DrainConfig {
370 max_items: drain_config.max_items.max(1),
371 deadline: drain_config.deadline,
372 };
373 let pump_config = PumpConfig {
374 max_outstanding: pump_config.max_outstanding.max(1),
375 };
376
377 let startup_started_at = now_rfc3339()?;
378 let (admin_reload, mut reload_rx) = AdminReloadHandle::channel();
379
380 eprintln!("[harn] orchestrator manifest: {}", config_path.display());
381 if let Some(name) = manifest
382 .package
383 .as_ref()
384 .and_then(|package| package.name.as_deref())
385 {
386 eprintln!("[harn] orchestrator package: {name}");
387 }
388 eprintln!(
389 "[harn] orchestrator role: {} ({})",
390 config.role.as_str(),
391 config.role.registry_mode()
392 );
393 eprintln!("[harn] orchestrator state dir: {}", state_dir.display());
394 tracing::info!(
395 component = "orchestrator",
396 trace_id = "",
397 role = config.role.as_str(),
398 state_dir = %state_dir.display(),
399 manifest = %config_path.display(),
400 "orchestrator starting"
401 );
402
403 let workspace_root = manifest_dir.clone();
404 let mut vm = config
405 .role
406 .build_vm(&workspace_root, &manifest_dir, &state_dir)?;
407
408 let event_log = harn_vm::event_log::active_event_log()
409 .ok_or_else(|| "event log was not installed during VM initialization".to_string())?;
410 let event_log_description = event_log.describe();
411 let tenant_store = if config.role == OrchestratorRole::MultiTenant {
412 let store = harn_vm::TenantStore::load(&state_dir)?;
413 let active_tenants = store
414 .list()
415 .into_iter()
416 .filter(|tenant| tenant.status == harn_vm::TenantStatus::Active)
417 .collect::<Vec<_>>();
418 eprintln!(
419 "[harn] tenants loaded: {} active ({})",
420 active_tenants.len(),
421 active_tenants
422 .iter()
423 .map(|tenant| tenant.scope.id.0.as_str())
424 .collect::<Vec<_>>()
425 .join(", ")
426 );
427 Some(Arc::new(store))
428 } else {
429 None
430 };
431 eprintln!(
432 "[harn] event log: {} {}",
433 event_log_description.backend,
434 event_log_description
435 .location
436 .as_ref()
437 .map(|path| path.display().to_string())
438 .unwrap_or_else(|| "<memory>".to_string())
439 );
440
441 let secret_namespace = secret_namespace_for(&manifest_dir);
442 let secret_chain_display = configured_secret_chain_display();
443 let secret_chain = harn_vm::secrets::configured_default_chain(secret_namespace.clone())
444 .map_err(|error| format!("failed to configure secret providers: {error}"))?;
445 if secret_chain.providers().is_empty() {
446 return Err("secret provider chain resolved to zero providers"
447 .to_string()
448 .into());
449 }
450 eprintln!(
451 "[harn] secret providers: {} (namespace {})",
452 secret_chain_display, secret_namespace
453 );
454 let secret_provider: Arc<dyn harn_vm::secrets::SecretProvider> = Arc::new(secret_chain);
455
456 let extensions = package::load_runtime_extensions(&config_path);
457 let metrics_registry = Arc::new(harn_vm::MetricsRegistry::default());
458 harn_vm::install_active_metrics_registry(metrics_registry.clone());
459 let collected_triggers = package::collect_manifest_triggers(&mut vm, &extensions)
460 .await
461 .map_err(|error| format!("failed to collect manifest triggers: {error}"))?;
462 package::install_collected_manifest_triggers(&collected_triggers).await?;
463 eprintln!(
464 "[harn] registered triggers ({}): {}",
465 collected_triggers.len(),
466 format_trigger_summary(&collected_triggers)
467 );
468
469 let binding_versions = live_manifest_binding_versions();
470 let route_configs = build_route_configs(&collected_triggers, &binding_versions)?;
471 let mut connector_runtime = initialize_connectors(
472 &collected_triggers,
473 event_log.clone(),
474 secret_provider.clone(),
475 metrics_registry.clone(),
476 &extensions.provider_connectors,
477 )
478 .await?;
479 let route_configs = attach_route_connectors(
480 route_configs,
481 &connector_runtime.registry,
482 &extensions.provider_connectors,
483 )?;
484 let connector_clients = connector_runtime.registry.client_map().await;
485 harn_vm::install_active_connector_clients(connector_clients);
486 eprintln!(
487 "[harn] registered connectors ({}): {}",
488 connector_runtime.providers.len(),
489 connector_runtime.providers.join(", ")
490 );
491 eprintln!(
492 "[harn] activated connectors: {}",
493 format_activation_summary(&connector_runtime.activations)
494 );
495 let (mcp_router, mcp_service) = if config.mcp {
496 validate_mcp_paths(
497 &config.mcp_path,
498 &config.mcp_sse_path,
499 &config.mcp_messages_path,
500 )?;
501 if !has_orchestrator_api_keys_configured() && !has_mcp_oauth_configured() {
502 return Err(OrchestratorError::Serve(
503 "--mcp requires HARN_ORCHESTRATOR_API_KEYS or HARN_MCP_OAUTH_AUTHORIZATION_SERVERS so the embedded MCP management surface is authenticated"
504 .to_string(),
505 ));
506 }
507 let service = Arc::new(
508 crate::commands::mcp::serve::McpOrchestratorService::new_local(
509 crate::cli::OrchestratorLocalArgs {
510 config: config_path.clone(),
511 state_dir: state_dir.clone(),
512 },
513 )?,
514 );
515 let router = crate::commands::mcp::serve::http_router_for_service(
516 service.clone(),
517 config.mcp_path.clone(),
518 config.mcp_sse_path.clone(),
519 config.mcp_messages_path.clone(),
520 );
521 eprintln!(
522 "[harn] embedded MCP server mounted at {} (legacy SSE {}, messages {})",
523 config.mcp_path, config.mcp_sse_path, config.mcp_messages_path
524 );
525 (Some(router), Some(service))
526 } else {
527 (None, None)
528 };
529
530 let dispatcher = harn_vm::Dispatcher::with_event_log_and_metrics(
531 vm,
532 event_log.clone(),
533 Some(metrics_registry.clone()),
534 );
535 let mut pending_pumps = vec![(
536 PENDING_TOPIC.to_string(),
537 spawn_pending_pump(
538 event_log.clone(),
539 dispatcher.clone(),
540 pump_config,
541 metrics_registry.clone(),
542 pump_drain_gate.clone(),
543 PENDING_TOPIC,
544 )?,
545 )];
546 let mut inbox_pumps = vec![(
547 harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC.to_string(),
548 spawn_inbox_pump(
549 event_log.clone(),
550 dispatcher.clone(),
551 pump_config,
552 metrics_registry.clone(),
553 harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC,
554 )?,
555 )];
556 if let Some(store) = tenant_store.as_ref() {
557 for tenant in store
558 .list()
559 .into_iter()
560 .filter(|tenant| tenant.status == harn_vm::TenantStatus::Active)
561 {
562 let pending_topic = harn_vm::tenant_topic(
563 &tenant.scope.id,
564 &harn_vm::event_log::Topic::new(PENDING_TOPIC)
565 .map_err(|error| error.to_string())?,
566 )
567 .map_err(|error| error.to_string())?;
568 pending_pumps.push((
569 pending_topic.as_str().to_string(),
570 spawn_pending_pump(
571 event_log.clone(),
572 dispatcher.clone(),
573 pump_config,
574 metrics_registry.clone(),
575 pump_drain_gate.clone(),
576 pending_topic.as_str(),
577 )?,
578 ));
579 let inbox_topic = harn_vm::tenant_topic(
580 &tenant.scope.id,
581 &harn_vm::event_log::Topic::new(harn_vm::TRIGGER_INBOX_ENVELOPES_TOPIC)
582 .map_err(|error| error.to_string())?,
583 )
584 .map_err(|error| error.to_string())?;
585 inbox_pumps.push((
586 inbox_topic.as_str().to_string(),
587 spawn_inbox_pump(
588 event_log.clone(),
589 dispatcher.clone(),
590 pump_config,
591 metrics_registry.clone(),
592 inbox_topic.as_str(),
593 )?,
594 ));
595 }
596 }
597 let cron_pump = spawn_cron_pump(
598 event_log.clone(),
599 dispatcher.clone(),
600 pump_config,
601 metrics_registry.clone(),
602 pump_drain_gate.clone(),
603 )?;
604 let waitpoint_pump = spawn_waitpoint_resume_pump(
605 event_log.clone(),
606 dispatcher.clone(),
607 pump_config,
608 metrics_registry.clone(),
609 pump_drain_gate.clone(),
610 )?;
611 let waitpoint_cancel_pump = spawn_waitpoint_cancel_pump(
612 event_log.clone(),
613 dispatcher.clone(),
614 pump_config,
615 metrics_registry.clone(),
616 pump_drain_gate.clone(),
617 )?;
618 let waitpoint_sweeper = spawn_waitpoint_sweeper(dispatcher.clone());
619
620 let listener = ListenerRuntime::start(ListenerConfig {
621 bind: config.bind,
622 tls: config.tls.clone(),
623 event_log: event_log.clone(),
624 secrets: secret_provider.clone(),
625 allowed_origins: OriginAllowList::from_manifest(&manifest.orchestrator.allowed_origins),
626 max_body_bytes: ListenerConfig::max_body_bytes_or_default(
627 manifest.orchestrator.max_body_bytes,
628 ),
629 metrics_registry: metrics_registry.clone(),
630 admin_reload: Some(admin_reload.clone()),
631 mcp_router,
632 routes: route_configs,
633 tenant_store: tenant_store.clone(),
634 session_store: Some(Arc::new(harn_vm::SessionStore::new(event_log.clone()))),
635 })
636 .await?;
637 let local_bind = listener.local_addr();
638 let listener_metrics = listener.trigger_metrics();
639 let mut live_manifest = manifest;
640 let mut live_triggers = collected_triggers;
641 let _manifest_watcher = if config.watch_manifest {
642 Some(spawn_manifest_watcher(
643 config_path.clone(),
644 admin_reload.clone(),
645 )?)
646 } else {
647 None
648 };
649 connector_runtime.activations = connector_runtime
650 .registry
651 .activate_all(&connector_runtime.trigger_registry)
652 .await
653 .map_err(|error| error.to_string())?;
654 eprintln!(
655 "[harn] activated connectors: {}",
656 format_activation_summary(&connector_runtime.activations)
657 );
658
659 listener.mark_ready();
660 eprintln!("[harn] HTTP listener ready on {}", listener.url());
661 tracing::info!(
662 component = "orchestrator",
663 trace_id = "",
664 listener_url = %listener.url(),
665 "HTTP listener ready"
666 );
667
668 write_state_snapshot(
669 &state_dir.join(STATE_SNAPSHOT_FILE),
670 &ServeStateSnapshot {
671 status: "running".to_string(),
672 role: config.role.as_str().to_string(),
673 bind: local_bind.to_string(),
674 listener_url: listener.url(),
675 manifest_path: config_path.display().to_string(),
676 state_dir: state_dir.display().to_string(),
677 started_at: startup_started_at.clone(),
678 stopped_at: None,
679 secret_provider_chain: secret_chain_display.clone(),
680 event_log_backend: event_log_description.backend.to_string(),
681 event_log_location: event_log_description
682 .location
683 .as_ref()
684 .map(|path| path.display().to_string()),
685 triggers: trigger_state_snapshots(&live_triggers, &listener_metrics),
686 connectors: connector_runtime.providers.clone(),
687 activations: connector_runtime
688 .activations
689 .iter()
690 .map(|activation| ConnectorActivationSnapshot {
691 provider: activation.provider.as_str().to_string(),
692 binding_count: activation.binding_count,
693 })
694 .collect(),
695 },
696 )?;
697
698 append_lifecycle_event(
699 &event_log,
700 "startup",
701 json!({
702 "bind": local_bind.to_string(),
703 "manifest": config_path.display().to_string(),
704 "role": config.role.as_str(),
705 "state_dir": state_dir.display().to_string(),
706 "trigger_count": live_triggers.len(),
707 "connector_count": connector_runtime.providers.len(),
708 "tls_enabled": listener.scheme() == "https",
709 "shutdown_timeout_secs": shutdown_timeout.as_secs(),
710 "drain_max_items": drain_config.max_items,
711 "drain_deadline_secs": drain_config.deadline.as_secs(),
712 "pump_max_outstanding": pump_config.max_outstanding,
713 }),
714 )
715 .await?;
716
717 let stranded = stranded_envelopes(&event_log, Duration::ZERO).await?;
718 if !stranded.is_empty() {
719 eprintln!(
720 "[harn] startup found {} stranded inbox envelope(s); inspect with `harn orchestrator queue` and recover explicitly with `harn orchestrator recover --dry-run --envelope-age ...`",
721 stranded.len()
722 );
723 }
724 append_lifecycle_event(
725 &event_log,
726 "startup_stranded_envelopes",
727 json!({
728 "count": stranded.len(),
729 }),
730 )
731 .await?;
732
733 let _ = ready_tx.send(Ok(ReadyState {
735 event_log: event_log.clone(),
736 listener_url: listener.url(),
737 local_addr: local_bind,
738 state_dir: state_dir.clone(),
739 admin_reload: admin_reload.clone(),
740 }));
741
742 let mut ctx = RuntimeCtx {
744 role: config.role,
745 config_path: &config_path,
746 state_dir: &state_dir,
747 bind: local_bind,
748 startup_started_at: &startup_started_at,
749 event_log: &event_log,
750 event_log_description: &event_log_description,
751 secret_chain_display: &secret_chain_display,
752 listener: &listener,
753 connectors: &mut connector_runtime,
754 live_manifest: &mut live_manifest,
755 live_triggers: &mut live_triggers,
756 secret_provider: &secret_provider,
757 metrics_registry: &metrics_registry,
758 mcp_service: mcp_service.as_ref(),
759 reload_rx: &mut reload_rx,
760 };
761
762 loop {
763 tokio::select! {
764 changed = shutdown_rx.changed() => {
765 if changed.is_err() || *shutdown_rx.borrow() {
766 break;
767 }
768 }
769 Some(request) = ctx.reload_rx.recv() => {
770 handle_reload_request(&mut ctx, request).await?;
771 }
772 }
773 }
774
775 listener.mark_not_ready();
776 let shutdown = graceful_shutdown(
777 GracefulShutdownCtx {
778 role: config.role,
779 bind: local_bind,
780 listener_url: listener.url(),
781 config_path: &config_path,
782 state_dir: &state_dir,
783 startup_started_at: &startup_started_at,
784 event_log: &event_log,
785 event_log_description: &event_log_description,
786 secret_chain_display: &secret_chain_display,
787 triggers: &live_triggers,
788 connectors: &connector_runtime,
789 shutdown_timeout,
790 drain_config,
791 },
792 listener,
793 dispatcher,
794 pending_pumps,
795 cron_pump,
796 inbox_pumps,
797 waitpoint_pump,
798 waitpoint_cancel_pump,
799 waitpoint_sweeper,
800 )
801 .await;
802
803 if let Some(obs) = observability {
804 if let Err(error) = obs.shutdown() {
805 if shutdown.is_ok() {
806 return Err(OrchestratorError::Serve(error));
807 }
808 eprintln!("[harn] observability shutdown warning: {error}");
809 }
810 }
811 harn_vm::clear_active_metrics_registry();
812 shutdown
813}
814
815struct ConnectorRuntime {
818 registry: harn_vm::ConnectorRegistry,
819 trigger_registry: harn_vm::TriggerRegistry,
820 handles: Vec<harn_vm::connectors::ConnectorHandle>,
821 providers: Vec<String>,
822 activations: Vec<harn_vm::ActivationHandle>,
823 #[cfg_attr(not(unix), allow(dead_code))]
824 provider_overrides: Vec<ResolvedProviderConnectorConfig>,
825}
826
827#[cfg_attr(not(unix), allow(dead_code))]
828#[derive(Clone, Debug, Default, Serialize)]
829struct ManifestReloadSummary {
830 added: Vec<String>,
831 modified: Vec<String>,
832 removed: Vec<String>,
833 unchanged: Vec<String>,
834}
835
836#[derive(Clone, Copy, Debug, PartialEq, Eq)]
837enum PumpMode {
838 Running,
839 Draining(PumpDrainRequest),
840}
841
842#[derive(Clone, Copy, Debug, PartialEq, Eq)]
843struct PumpDrainRequest {
844 up_to: u64,
845 config: DrainConfig,
846 deadline: tokio::time::Instant,
847}
848
849#[derive(Clone, Copy, Debug, PartialEq, Eq)]
850enum PumpDrainStopReason {
851 Drained,
852 MaxItems,
853 Deadline,
854 Error,
855}
856
857impl PumpDrainStopReason {
858 fn as_str(self) -> &'static str {
859 match self {
860 Self::Drained => "drained",
861 Self::MaxItems => "max_items",
862 Self::Deadline => "deadline",
863 Self::Error => "error",
864 }
865 }
866}
867
868#[derive(Clone, Copy, Debug, Default)]
869struct PumpStats {
870 last_seen: u64,
871 processed: u64,
872}
873
874#[derive(Clone, Copy, Debug)]
875struct PumpDrainProgress {
876 request: PumpDrainRequest,
877 start_seen: u64,
878}
879
880#[derive(Clone, Copy, Debug)]
881struct PumpDrainReport {
882 stats: PumpStats,
883 drain_items: u64,
884 remaining_queued: u64,
885 outstanding_tasks: usize,
886 stop_reason: PumpDrainStopReason,
887}
888
889impl PumpDrainReport {
890 fn truncated(self) -> bool {
891 self.remaining_queued > 0 || self.outstanding_tasks > 0
892 }
893}
894
895struct PumpHandle {
896 mode_tx: watch::Sender<PumpMode>,
897 join: tokio::task::JoinHandle<Result<PumpDrainReport, OrchestratorError>>,
898}
899
900impl PumpHandle {
901 async fn drain(
902 self,
903 log: &Arc<AnyEventLog>,
904 topic_name: &str,
905 up_to: u64,
906 config: DrainConfig,
907 overall_deadline: tokio::time::Instant,
908 ) -> Result<PumpDrainReport, OrchestratorError> {
909 let drain_deadline = std::cmp::min(
910 tokio::time::Instant::now() + config.deadline,
911 overall_deadline,
912 );
913 let _ = self.mode_tx.send(PumpMode::Draining(PumpDrainRequest {
914 up_to,
915 config,
916 deadline: drain_deadline,
917 }));
918 append_pump_lifecycle_event(
919 log,
920 "pump_drain_started",
921 json!({
922 "topic": topic_name,
923 "up_to": up_to,
924 "max_items": config.max_items,
925 "drain_deadline_ms": config.deadline.as_millis(),
926 }),
927 )
928 .await?;
929 match self.join.await {
930 Ok(result) => result,
931 Err(error) => Err(format!("pump task join failed: {error}").into()),
932 }
933 }
934}
935
936struct WaitpointSweepHandle {
937 stop_tx: watch::Sender<bool>,
938 join: tokio::task::JoinHandle<Result<(), OrchestratorError>>,
939}
940
941impl WaitpointSweepHandle {
942 async fn shutdown(self) -> Result<(), OrchestratorError> {
943 let _ = self.stop_tx.send(true);
944 match self.join.await {
945 Ok(result) => result,
946 Err(error) => Err(format!("waitpoint sweeper join failed: {error}").into()),
947 }
948 }
949}
950
951#[derive(Debug, Deserialize)]
952struct PendingTriggerRecord {
953 trigger_id: String,
954 binding_version: u32,
955 event: harn_vm::TriggerEvent,
956}
957
958fn spawn_pending_pump(
961 event_log: Arc<harn_vm::event_log::AnyEventLog>,
962 dispatcher: harn_vm::Dispatcher,
963 pump_config: PumpConfig,
964 metrics_registry: Arc<harn_vm::MetricsRegistry>,
965 pump_drain_gate: PumpDrainGate,
966 topic_name: &str,
967) -> Result<PumpHandle, OrchestratorError> {
968 let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
969 spawn_topic_pump(
970 event_log,
971 topic,
972 pump_config,
973 metrics_registry,
974 pump_drain_gate,
975 move |logged| {
976 let dispatcher = dispatcher.clone();
977 async move {
978 if pending_pump_test_should_fail() {
979 return Err("test pending pump failure".to_string().into());
980 }
981 if logged.kind != "trigger_event" {
982 return Ok(false);
983 }
984 let record: PendingTriggerRecord = serde_json::from_value(logged.payload)
985 .map_err(|error| format!("failed to decode pending trigger event: {error}"))?;
986 dispatcher
987 .enqueue_targeted_with_headers(
988 Some(record.trigger_id),
989 Some(record.binding_version),
990 record.event,
991 Some(&logged.headers),
992 )
993 .await
994 .map_err(|error| format!("failed to enqueue pending trigger event: {error}"))?;
995 Ok(true)
996 }
997 },
998 )
999}
1000
1001fn spawn_cron_pump(
1002 event_log: Arc<harn_vm::event_log::AnyEventLog>,
1003 dispatcher: harn_vm::Dispatcher,
1004 pump_config: PumpConfig,
1005 metrics_registry: Arc<harn_vm::MetricsRegistry>,
1006 pump_drain_gate: PumpDrainGate,
1007) -> Result<PumpHandle, OrchestratorError> {
1008 let topic =
1009 harn_vm::event_log::Topic::new(CRON_TICK_TOPIC).map_err(|error| error.to_string())?;
1010 spawn_topic_pump(
1011 event_log,
1012 topic,
1013 pump_config,
1014 metrics_registry,
1015 pump_drain_gate,
1016 move |logged| {
1017 let dispatcher = dispatcher.clone();
1018 async move {
1019 if logged.kind != "trigger_event" {
1020 return Ok(false);
1021 }
1022 let event: harn_vm::TriggerEvent = serde_json::from_value(logged.payload)
1023 .map_err(|error| format!("failed to decode cron trigger event: {error}"))?;
1024 let trigger_id = match &event.provider_payload {
1025 harn_vm::ProviderPayload::Known(
1026 harn_vm::triggers::event::KnownProviderPayload::Cron(payload),
1027 ) => payload.cron_id.clone(),
1028 _ => None,
1029 };
1030 dispatcher
1031 .enqueue_targeted_with_headers(trigger_id, None, event, Some(&logged.headers))
1032 .await
1033 .map_err(|error| format!("failed to enqueue cron trigger event: {error}"))?;
1034 Ok(true)
1035 }
1036 },
1037 )
1038}
1039
1040fn spawn_inbox_pump(
1041 event_log: Arc<harn_vm::event_log::AnyEventLog>,
1042 dispatcher: harn_vm::Dispatcher,
1043 pump_config: PumpConfig,
1044 metrics_registry: Arc<harn_vm::MetricsRegistry>,
1045 topic_name: &str,
1046) -> Result<PumpHandle, OrchestratorError> {
1047 let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
1048 let consumer = pump_consumer_id(&topic)?;
1049 let inbox_task_release_file = inbox_task_test_release_file();
1050 let (mode_tx, mut mode_rx) = watch::channel(PumpMode::Running);
1051 let join = tokio::task::spawn_local(async move {
1052 let start_from = event_log
1053 .consumer_cursor(&topic, &consumer)
1054 .await
1055 .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
1056 .or(event_log
1057 .latest(&topic)
1058 .await
1059 .map_err(|error| format!("failed to read topic head {topic}: {error}"))?);
1060 let mut stream = event_log
1061 .clone()
1062 .subscribe(&topic, start_from)
1063 .await
1064 .map_err(|error| format!("failed to subscribe topic {topic}: {error}"))?;
1065 let mut stats = PumpStats {
1066 last_seen: start_from.unwrap_or(0),
1067 processed: 0,
1068 };
1069 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1070 let mut drain_progress = None;
1071 let mut tasks = JoinSet::new();
1072
1073 loop {
1074 if let Some(progress) = drain_progress {
1075 if let Some(report) = maybe_finish_pump_drain(stats, progress, tasks.len()) {
1076 return Ok(report);
1077 }
1078 }
1079
1080 let deadline = drain_progress.map(|progress| progress.request.deadline);
1081 let mut deadline_wait = Box::pin(async move {
1082 if let Some(deadline) = deadline {
1083 tokio::time::sleep_until(deadline).await;
1084 } else {
1085 std::future::pending::<()>().await;
1086 }
1087 });
1088
1089 tokio::select! {
1090 changed = mode_rx.changed() => {
1091 if changed.is_err() {
1092 break;
1093 }
1094 if let PumpMode::Draining(request) = *mode_rx.borrow() {
1095 drain_progress.get_or_insert(PumpDrainProgress {
1096 request,
1097 start_seen: stats.last_seen,
1098 });
1099 }
1100 }
1101 _ = &mut deadline_wait => {
1102 if let Some(progress) = drain_progress {
1103 return Ok(pump_drain_report(
1104 stats,
1105 progress.start_seen,
1106 progress.request.up_to,
1107 tasks.len(),
1108 PumpDrainStopReason::Deadline,
1109 ));
1110 }
1111 }
1112 joined = tasks.join_next(), if !tasks.is_empty() => {
1113 match joined {
1114 Some(Ok(())) => {
1115 record_pump_metrics(
1116 &metrics_registry,
1117 &event_log,
1118 &topic,
1119 stats.last_seen,
1120 tasks.len(),
1121 )
1122 .await?;
1123 }
1124 Some(Err(error)) => {
1125 return Err(format!("inbox dispatch task join failed: {error}").into());
1126 }
1127 None => {}
1128 }
1129 }
1130 _ = tokio::time::sleep(Duration::from_millis(25)), if tasks.len() >= pump_config.max_outstanding => {
1131 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1132 }
1133 received = stream.next(), if tasks.len() < pump_config.max_outstanding => {
1134 let Some(received) = received else {
1135 break;
1136 };
1137 let (event_id, logged) = received
1138 .map_err(|error| format!("topic pump read failed for {topic}: {error}"))?;
1139 if logged.kind != "event_ingested" {
1140 stats.last_seen = event_id;
1141 event_log
1142 .ack(&topic, &consumer, event_id)
1143 .await
1144 .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1145 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1146 continue;
1147 }
1148 append_pump_lifecycle_event(
1149 &event_log,
1150 "pump_received",
1151 json!({
1152 "topic": topic.as_str(),
1153 "event_log_id": event_id,
1154 "outstanding": tasks.len(),
1155 "max_outstanding": pump_config.max_outstanding,
1156 }),
1157 )
1158 .await?;
1159 let envelope: harn_vm::triggers::dispatcher::InboxEnvelope =
1160 serde_json::from_value(logged.payload)
1161 .map_err(|error| format!("failed to decode dispatcher inbox event: {error}"))?;
1162 let trigger_id = envelope.trigger_id.clone();
1163 let binding_version = envelope.binding_version;
1164 let trigger_event_id = envelope.event.id.0.clone();
1165 let parent_headers = logged.headers.clone();
1166 append_pump_lifecycle_event(
1167 &event_log,
1168 "pump_eligible",
1169 json!({
1170 "topic": topic.as_str(),
1171 "event_log_id": event_id,
1172 "trigger_id": trigger_id.clone(),
1173 "binding_version": binding_version,
1174 "trigger_event_id": trigger_event_id,
1175 }),
1176 )
1177 .await?;
1178 metrics_registry.record_orchestrator_pump_admission_delay(
1179 topic.as_str(),
1180 admission_delay(logged.occurred_at_ms),
1181 );
1182 append_pump_lifecycle_event(
1183 &event_log,
1184 "pump_admitted",
1185 json!({
1186 "topic": topic.as_str(),
1187 "event_log_id": event_id,
1188 "outstanding_after_admit": tasks.len() + 1,
1189 "max_outstanding": pump_config.max_outstanding,
1190 "trigger_id": trigger_id.clone(),
1191 "binding_version": binding_version,
1192 "trigger_event_id": trigger_event_id,
1193 }),
1194 )
1195 .await?;
1196 let dispatcher = dispatcher.clone();
1197 let task_event_log = event_log.clone();
1198 let task_topic = topic.as_str().to_string();
1199 let inbox_task_release_file = inbox_task_release_file.clone();
1200 tasks.spawn_local(async move {
1201 if let Some(path) = inbox_task_release_file.as_ref() {
1202 wait_for_test_release_file(path).await;
1203 }
1204 let _ = append_pump_lifecycle_event(
1205 &task_event_log,
1206 "pump_dispatch_started",
1207 json!({
1208 "topic": task_topic.clone(),
1209 "event_log_id": event_id,
1210 "trigger_id": trigger_id,
1211 "binding_version": binding_version,
1212 "trigger_event_id": trigger_event_id,
1213 }),
1214 )
1215 .await;
1216 let result = dispatcher
1217 .dispatch_inbox_envelope_with_parent_headers(
1218 envelope,
1219 &parent_headers,
1220 )
1221 .await;
1222 let (status, error_message) = match result {
1223 Ok(_) => ("completed", None),
1224 Err(error) => {
1225 let message = error.to_string();
1226 eprintln!("[harn] inbox dispatch warning: {message}");
1227 ("failed", Some(message))
1228 }
1229 };
1230 let _ = append_pump_lifecycle_event(
1231 &task_event_log,
1232 "pump_dispatch_completed",
1233 json!({
1234 "topic": task_topic,
1235 "event_log_id": event_id,
1236 "status": status,
1237 "error": error_message,
1238 }),
1239 )
1240 .await;
1241 });
1242 stats.last_seen = event_id;
1243 stats.processed += 1;
1244 event_log
1245 .ack(&topic, &consumer, event_id)
1246 .await
1247 .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1248 append_pump_lifecycle_event(
1249 &event_log,
1250 "pump_acked",
1251 json!({
1252 "topic": topic.as_str(),
1253 "event_log_id": event_id,
1254 "cursor": event_id,
1255 }),
1256 )
1257 .await?;
1258 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, tasks.len()).await?;
1259 }
1260 }
1261 }
1262
1263 while let Some(joined) = tasks.join_next().await {
1264 joined.map_err(|error| format!("inbox dispatch task join failed: {error}"))?;
1265 record_pump_metrics(
1266 &metrics_registry,
1267 &event_log,
1268 &topic,
1269 stats.last_seen,
1270 tasks.len(),
1271 )
1272 .await?;
1273 }
1274
1275 Ok(drain_progress
1276 .map(|progress| {
1277 pump_drain_report(
1278 stats,
1279 progress.start_seen,
1280 progress.request.up_to,
1281 0,
1282 PumpDrainStopReason::Drained,
1283 )
1284 })
1285 .unwrap_or_else(|| PumpDrainReport {
1286 stats,
1287 drain_items: 0,
1288 remaining_queued: 0,
1289 outstanding_tasks: 0,
1290 stop_reason: PumpDrainStopReason::Drained,
1291 }))
1292 });
1293 Ok(PumpHandle { mode_tx, join })
1294}
1295
1296fn spawn_waitpoint_resume_pump(
1297 event_log: Arc<harn_vm::event_log::AnyEventLog>,
1298 dispatcher: harn_vm::Dispatcher,
1299 pump_config: PumpConfig,
1300 metrics_registry: Arc<harn_vm::MetricsRegistry>,
1301 pump_drain_gate: PumpDrainGate,
1302) -> Result<PumpHandle, OrchestratorError> {
1303 let topic = harn_vm::event_log::Topic::new(harn_vm::WAITPOINT_RESUME_TOPIC)
1304 .map_err(|error| error.to_string())?;
1305 spawn_topic_pump(
1306 event_log,
1307 topic,
1308 pump_config,
1309 metrics_registry,
1310 pump_drain_gate,
1311 move |logged| {
1312 let dispatcher = dispatcher.clone();
1313 async move {
1314 harn_vm::process_waitpoint_resume_event(&dispatcher, logged)
1315 .await
1316 .map_err(OrchestratorError::from)
1317 }
1318 },
1319 )
1320}
1321
1322fn spawn_waitpoint_cancel_pump(
1323 event_log: Arc<harn_vm::event_log::AnyEventLog>,
1324 dispatcher: harn_vm::Dispatcher,
1325 pump_config: PumpConfig,
1326 metrics_registry: Arc<harn_vm::MetricsRegistry>,
1327 pump_drain_gate: PumpDrainGate,
1328) -> Result<PumpHandle, OrchestratorError> {
1329 let topic = harn_vm::event_log::Topic::new(harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC)
1330 .map_err(|error| error.to_string())?;
1331 spawn_topic_pump(
1332 event_log,
1333 topic,
1334 pump_config,
1335 metrics_registry,
1336 pump_drain_gate,
1337 move |logged| {
1338 let dispatcher = dispatcher.clone();
1339 async move {
1340 if logged.kind != "dispatch_cancel_requested" {
1341 return Ok(false);
1342 }
1343 harn_vm::service_waitpoints_once(&dispatcher, None)
1344 .await
1345 .map_err(|error| {
1346 OrchestratorError::Serve(format!(
1347 "failed to service waitpoints on cancel: {error}"
1348 ))
1349 })?;
1350 Ok(true)
1351 }
1352 },
1353 )
1354}
1355
1356fn spawn_waitpoint_sweeper(dispatcher: harn_vm::Dispatcher) -> WaitpointSweepHandle {
1357 let (stop_tx, mut stop_rx) = watch::channel(false);
1358 let join = tokio::task::spawn_local(async move {
1359 let mut interval = tokio::time::interval(WAITPOINT_SERVICE_INTERVAL);
1360 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1361 loop {
1362 tokio::select! {
1363 changed = stop_rx.changed() => {
1364 if changed.is_err() || *stop_rx.borrow() {
1365 break;
1366 }
1367 }
1368 _ = interval.tick() => {
1369 harn_vm::service_waitpoints_once(&dispatcher, None)
1370 .await
1371 .map_err(|error| format!("failed to service waitpoints on sweep: {error}"))?;
1372 }
1373 }
1374 }
1375 Ok(())
1376 });
1377 WaitpointSweepHandle { stop_tx, join }
1378}
1379
1380fn spawn_topic_pump<F, Fut>(
1381 event_log: Arc<harn_vm::event_log::AnyEventLog>,
1382 topic: harn_vm::event_log::Topic,
1383 _pump_config: PumpConfig,
1384 metrics_registry: Arc<harn_vm::MetricsRegistry>,
1385 pump_drain_gate: PumpDrainGate,
1386 process: F,
1387) -> Result<PumpHandle, OrchestratorError>
1388where
1389 F: Fn(harn_vm::event_log::LogEvent) -> Fut + 'static,
1390 Fut: std::future::Future<Output = Result<bool, OrchestratorError>> + 'static,
1391{
1392 let consumer = pump_consumer_id(&topic)?;
1393 let mut pump_drain_gate_rx = pump_drain_gate.subscribe();
1394 let (mode_tx, mut mode_rx) = watch::channel(PumpMode::Running);
1395 let join = tokio::task::spawn_local(async move {
1396 let start_from = event_log
1397 .consumer_cursor(&topic, &consumer)
1398 .await
1399 .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
1400 .or(event_log
1401 .latest(&topic)
1402 .await
1403 .map_err(|error| format!("failed to read topic head {topic}: {error}"))?);
1404 let mut stream = event_log
1405 .clone()
1406 .subscribe(&topic, start_from)
1407 .await
1408 .map_err(|error| format!("failed to subscribe topic {topic}: {error}"))?;
1409 let mut stats = PumpStats {
1410 last_seen: start_from.unwrap_or(0),
1411 processed: 0,
1412 };
1413 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1414 let mut drain_progress = None;
1415 loop {
1416 if let Some(progress) = drain_progress {
1417 if let Some(report) = maybe_finish_pump_drain(stats, progress, 0) {
1418 return Ok(report);
1419 }
1420 }
1421 let deadline = drain_progress.map(|progress| progress.request.deadline);
1422 let mut deadline_wait = Box::pin(async move {
1423 if let Some(deadline) = deadline {
1424 tokio::time::sleep_until(deadline).await;
1425 } else {
1426 std::future::pending::<()>().await;
1427 }
1428 });
1429 tokio::select! {
1430 changed = mode_rx.changed() => {
1431 if changed.is_err() {
1432 break;
1433 }
1434 if let PumpMode::Draining(request) = *mode_rx.borrow() {
1435 drain_progress.get_or_insert(PumpDrainProgress {
1436 request,
1437 start_seen: stats.last_seen,
1438 });
1439 }
1440 }
1441 _ = &mut deadline_wait => {
1442 if let Some(progress) = drain_progress {
1443 return Ok(pump_drain_report(
1444 stats,
1445 progress.start_seen,
1446 progress.request.up_to,
1447 0,
1448 PumpDrainStopReason::Deadline,
1449 ));
1450 }
1451 }
1452 received = stream.next() => {
1453 let Some(received) = received else {
1454 break;
1455 };
1456 let (event_id, logged) = received
1457 .map_err(|error| format!("topic pump read failed for {topic}: {error}"))?;
1458 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 1).await?;
1459 metrics_registry.record_orchestrator_pump_admission_delay(
1460 topic.as_str(),
1461 admission_delay(logged.occurred_at_ms),
1462 );
1463 wait_for_pump_drain_release(
1464 &event_log,
1465 &topic,
1466 event_id,
1467 &mut pump_drain_gate_rx,
1468 )
1469 .await?;
1470 let handled = process(logged).await?;
1471 stats.last_seen = event_id;
1472 if handled {
1473 stats.processed += 1;
1474 }
1475 event_log
1476 .ack(&topic, &consumer, event_id)
1477 .await
1478 .map_err(|error| format!("failed to ack topic pump cursor for {topic}: {error}"))?;
1479 record_pump_metrics(&metrics_registry, &event_log, &topic, stats.last_seen, 0).await?;
1480 }
1481 }
1482 }
1483 Ok(drain_progress
1484 .map(|progress| {
1485 pump_drain_report(
1486 stats,
1487 progress.start_seen,
1488 progress.request.up_to,
1489 0,
1490 PumpDrainStopReason::Drained,
1491 )
1492 })
1493 .unwrap_or_else(|| PumpDrainReport {
1494 stats,
1495 drain_items: 0,
1496 remaining_queued: 0,
1497 outstanding_tasks: 0,
1498 stop_reason: PumpDrainStopReason::Drained,
1499 }))
1500 });
1501 Ok(PumpHandle { mode_tx, join })
1502}
1503
1504#[allow(clippy::too_many_arguments)]
1507async fn graceful_shutdown(
1508 ctx: GracefulShutdownCtx<'_>,
1509 listener: ListenerRuntime,
1510 dispatcher: harn_vm::Dispatcher,
1511 pending_pumps: Vec<(String, PumpHandle)>,
1512 cron_pump: PumpHandle,
1513 inbox_pumps: Vec<(String, PumpHandle)>,
1514 waitpoint_pump: PumpHandle,
1515 waitpoint_cancel_pump: PumpHandle,
1516 waitpoint_sweeper: WaitpointSweepHandle,
1517) -> Result<(), OrchestratorError> {
1518 eprintln!("[harn] signal received, starting graceful shutdown...");
1519 tracing::info!(
1520 component = "orchestrator",
1521 trace_id = "",
1522 shutdown_timeout_secs = ctx.shutdown_timeout.as_secs(),
1523 "signal received, starting graceful shutdown"
1524 );
1525 let listener_in_flight = listener
1526 .trigger_metrics()
1527 .into_values()
1528 .map(|metrics| metrics.in_flight)
1529 .sum::<u64>();
1530 let dispatcher_before = dispatcher.snapshot();
1531 append_lifecycle_event(
1532 ctx.event_log,
1533 "draining",
1534 json!({
1535 "bind": ctx.bind.to_string(),
1536 "role": ctx.role.as_str(),
1537 "status": "draining",
1538 "http_in_flight": listener_in_flight,
1539 "dispatcher_in_flight": dispatcher_before.in_flight,
1540 "dispatcher_retry_queue_depth": dispatcher_before.retry_queue_depth,
1541 "dispatcher_dlq_depth": dispatcher_before.dlq_depth,
1542 "shutdown_timeout_secs": ctx.shutdown_timeout.as_secs(),
1543 "drain_max_items": ctx.drain_config.max_items,
1544 "drain_deadline_secs": ctx.drain_config.deadline.as_secs(),
1545 }),
1546 )
1547 .await?;
1548
1549 let deadline = tokio::time::Instant::now() + ctx.shutdown_timeout;
1550 let listener_metrics = listener.shutdown(remaining_budget(deadline)).await?;
1551 for handle in &ctx.connectors.handles {
1552 let connector = handle.lock().await;
1553 if let Err(error) = connector.shutdown(remaining_budget(deadline)).await {
1554 eprintln!(
1555 "[harn] connector {} shutdown warning: {error}",
1556 connector.provider_id().as_str()
1557 );
1558 }
1559 }
1560
1561 let mut pending_processed = 0;
1562 for (topic_name, pump) in pending_pumps {
1563 let stats =
1564 drain_pump_best_effort(ctx.event_log, &topic_name, pump, ctx.drain_config, deadline)
1565 .await?;
1566 pending_processed += stats.stats.processed;
1567 emit_drain_truncated(ctx.event_log, &topic_name, stats, ctx.drain_config).await?;
1568 }
1569 let cron_stats = drain_pump_best_effort(
1570 ctx.event_log,
1571 CRON_TICK_TOPIC,
1572 cron_pump,
1573 ctx.drain_config,
1574 deadline,
1575 )
1576 .await?;
1577 emit_drain_truncated(ctx.event_log, CRON_TICK_TOPIC, cron_stats, ctx.drain_config).await?;
1578 let mut inbox_processed = 0;
1579 for (topic_name, pump) in inbox_pumps {
1580 let stats =
1581 drain_pump_best_effort(ctx.event_log, &topic_name, pump, ctx.drain_config, deadline)
1582 .await?;
1583 inbox_processed += stats.stats.processed;
1584 emit_drain_truncated(ctx.event_log, &topic_name, stats, ctx.drain_config).await?;
1585 }
1586 let waitpoint_stats = waitpoint_pump
1587 .drain(
1588 ctx.event_log,
1589 harn_vm::WAITPOINT_RESUME_TOPIC,
1590 topic_latest_id(ctx.event_log, harn_vm::WAITPOINT_RESUME_TOPIC).await?,
1591 ctx.drain_config,
1592 deadline,
1593 )
1594 .await?;
1595 emit_drain_truncated(
1596 ctx.event_log,
1597 harn_vm::WAITPOINT_RESUME_TOPIC,
1598 waitpoint_stats,
1599 ctx.drain_config,
1600 )
1601 .await?;
1602 let waitpoint_cancel_stats = waitpoint_cancel_pump
1603 .drain(
1604 ctx.event_log,
1605 harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC,
1606 topic_latest_id(ctx.event_log, harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC).await?,
1607 ctx.drain_config,
1608 deadline,
1609 )
1610 .await?;
1611 emit_drain_truncated(
1612 ctx.event_log,
1613 harn_vm::TRIGGER_CANCEL_REQUESTS_TOPIC,
1614 waitpoint_cancel_stats,
1615 ctx.drain_config,
1616 )
1617 .await?;
1618 waitpoint_sweeper.shutdown().await?;
1619 let drain_report = dispatcher
1620 .drain(remaining_budget(deadline))
1621 .await
1622 .map_err(|error| format!("failed to drain dispatcher: {error}"))?;
1623
1624 let stopped_at = now_rfc3339()?;
1625 let timed_out = !drain_report.drained;
1626 if timed_out {
1627 dispatcher.shutdown();
1628 }
1629 append_lifecycle_event(
1630 ctx.event_log,
1631 "stopped",
1632 json!({
1633 "bind": ctx.bind.to_string(),
1634 "role": ctx.role.as_str(),
1635 "status": "stopped",
1636 "http_in_flight": listener_in_flight,
1637 "dispatcher_in_flight": drain_report.in_flight,
1638 "dispatcher_retry_queue_depth": drain_report.retry_queue_depth,
1639 "dispatcher_dlq_depth": drain_report.dlq_depth,
1640 "pending_events_drained": pending_processed,
1641 "cron_events_drained": cron_stats.stats.processed,
1642 "inbox_events_drained": inbox_processed,
1643 "waitpoint_events_drained": waitpoint_stats.stats.processed,
1644 "waitpoint_cancel_events_drained": waitpoint_cancel_stats.stats.processed,
1645 "timed_out": timed_out,
1646 }),
1647 )
1648 .await?;
1649 ctx.event_log
1650 .flush()
1651 .await
1652 .map_err(|error| format!("failed to flush event log: {error}"))?;
1653
1654 write_state_snapshot(
1655 &ctx.state_dir.join(STATE_SNAPSHOT_FILE),
1656 &ServeStateSnapshot {
1657 status: "stopped".to_string(),
1658 role: ctx.role.as_str().to_string(),
1659 bind: ctx.bind.to_string(),
1660 listener_url: ctx.listener_url.clone(),
1661 manifest_path: ctx.config_path.display().to_string(),
1662 state_dir: ctx.state_dir.display().to_string(),
1663 started_at: ctx.startup_started_at.to_string(),
1664 stopped_at: Some(stopped_at),
1665 secret_provider_chain: ctx.secret_chain_display.to_string(),
1666 event_log_backend: ctx.event_log_description.backend.to_string(),
1667 event_log_location: ctx
1668 .event_log_description
1669 .location
1670 .as_ref()
1671 .map(|path| path.display().to_string()),
1672 triggers: trigger_state_snapshots(ctx.triggers, &listener_metrics),
1673 connectors: ctx.connectors.providers.clone(),
1674 activations: ctx
1675 .connectors
1676 .activations
1677 .iter()
1678 .map(|activation| ConnectorActivationSnapshot {
1679 provider: activation.provider.as_str().to_string(),
1680 binding_count: activation.binding_count,
1681 })
1682 .collect(),
1683 },
1684 )?;
1685
1686 if timed_out {
1687 eprintln!(
1688 "[harn] graceful shutdown timed out with {} dispatches and {} retry waits remaining",
1689 drain_report.in_flight, drain_report.retry_queue_depth
1690 );
1691 }
1692 eprintln!("[harn] graceful shutdown complete");
1693 tracing::info!(
1694 component = "orchestrator",
1695 trace_id = "",
1696 "graceful shutdown complete"
1697 );
1698 Ok(())
1699}
1700
1701struct GracefulShutdownCtx<'a> {
1702 role: OrchestratorRole,
1703 bind: SocketAddr,
1704 listener_url: String,
1705 config_path: &'a Path,
1706 state_dir: &'a Path,
1707 startup_started_at: &'a str,
1708 event_log: &'a Arc<AnyEventLog>,
1709 event_log_description: &'a harn_vm::event_log::EventLogDescription,
1710 secret_chain_display: &'a str,
1711 triggers: &'a [CollectedManifestTrigger],
1712 connectors: &'a ConnectorRuntime,
1713 shutdown_timeout: Duration,
1714 drain_config: DrainConfig,
1715}
1716
1717struct RuntimeCtx<'a> {
1720 role: OrchestratorRole,
1721 config_path: &'a Path,
1722 state_dir: &'a Path,
1723 bind: SocketAddr,
1724 startup_started_at: &'a str,
1725 event_log: &'a Arc<AnyEventLog>,
1726 event_log_description: &'a harn_vm::event_log::EventLogDescription,
1727 secret_chain_display: &'a str,
1728 listener: &'a ListenerRuntime,
1729 connectors: &'a mut ConnectorRuntime,
1730 live_manifest: &'a mut Manifest,
1731 live_triggers: &'a mut Vec<CollectedManifestTrigger>,
1732 secret_provider: &'a Arc<dyn harn_vm::secrets::SecretProvider>,
1733 metrics_registry: &'a Arc<harn_vm::MetricsRegistry>,
1734 mcp_service: Option<&'a Arc<crate::commands::mcp::serve::McpOrchestratorService>>,
1735 #[cfg_attr(not(unix), allow(dead_code))]
1736 reload_rx: &'a mut mpsc::UnboundedReceiver<AdminReloadRequest>,
1737}
1738
1739#[cfg_attr(not(unix), allow(dead_code))]
1740async fn handle_reload_request(
1741 ctx: &mut RuntimeCtx<'_>,
1742 request: AdminReloadRequest,
1743) -> Result<(), OrchestratorError> {
1744 let source = request.source.clone();
1745 match reload_manifest(ctx).await {
1746 Ok(summary) => {
1747 if let Some(mcp_service) = ctx.mcp_service {
1748 mcp_service.notify_manifest_reloaded();
1749 }
1750 write_running_state_snapshot(ctx)?;
1751 append_manifest_event(
1752 ctx.event_log,
1753 "reload_succeeded",
1754 json!({
1755 "source": source,
1756 "summary": summary,
1757 }),
1758 )
1759 .await?;
1760 eprintln!(
1761 "[harn] manifest reload ({source}) applied: +{} ~{} -{}",
1762 summary.added.len(),
1763 summary.modified.len(),
1764 summary.removed.len()
1765 );
1766 if let Some(response_tx) = request.response_tx {
1767 let _ = response_tx.send(serde_json::to_value(&summary).map_err(|error| {
1768 OrchestratorError::Serve(format!("failed to encode reload summary: {error}"))
1769 }));
1770 }
1771 }
1772 Err(error) => {
1773 eprintln!("[harn] manifest reload ({source}) failed: {error}");
1774 append_manifest_event(
1775 ctx.event_log,
1776 "reload_failed",
1777 json!({
1778 "source": source,
1779 "error": error.to_string(),
1780 }),
1781 )
1782 .await?;
1783 if let Some(response_tx) = request.response_tx {
1784 let _ = response_tx.send(Err(error));
1785 }
1786 }
1787 }
1788 Ok(())
1789}
1790
1791#[cfg_attr(not(unix), allow(dead_code))]
1792fn write_running_state_snapshot(ctx: &RuntimeCtx<'_>) -> Result<(), OrchestratorError> {
1793 let listener_metrics = ctx.listener.trigger_metrics();
1794 write_state_snapshot(
1795 &ctx.state_dir.join(STATE_SNAPSHOT_FILE),
1796 &ServeStateSnapshot {
1797 status: "running".to_string(),
1798 role: ctx.role.as_str().to_string(),
1799 bind: ctx.bind.to_string(),
1800 listener_url: ctx.listener.url(),
1801 manifest_path: ctx.config_path.display().to_string(),
1802 state_dir: ctx.state_dir.display().to_string(),
1803 started_at: ctx.startup_started_at.to_string(),
1804 stopped_at: None,
1805 secret_provider_chain: ctx.secret_chain_display.to_string(),
1806 event_log_backend: ctx.event_log_description.backend.to_string(),
1807 event_log_location: ctx
1808 .event_log_description
1809 .location
1810 .as_ref()
1811 .map(|path| path.display().to_string()),
1812 triggers: trigger_state_snapshots(ctx.live_triggers, &listener_metrics),
1813 connectors: ctx.connectors.providers.clone(),
1814 activations: ctx
1815 .connectors
1816 .activations
1817 .iter()
1818 .map(|activation| ConnectorActivationSnapshot {
1819 provider: activation.provider.as_str().to_string(),
1820 binding_count: activation.binding_count,
1821 })
1822 .collect(),
1823 },
1824 )
1825}
1826
1827#[cfg_attr(not(unix), allow(dead_code))]
1828async fn reload_manifest(
1829 ctx: &mut RuntimeCtx<'_>,
1830) -> Result<ManifestReloadSummary, OrchestratorError> {
1831 let (manifest, manifest_dir) = load_manifest(ctx.config_path)?;
1832 let mut vm = ctx
1833 .role
1834 .build_vm(&manifest_dir, &manifest_dir, ctx.state_dir)?;
1835 let extensions = package::load_runtime_extensions(ctx.config_path);
1836 let collected_triggers = package::collect_manifest_triggers(&mut vm, &extensions)
1837 .await
1838 .map_err(|error| format!("failed to collect manifest triggers: {error}"))?;
1839 let summary = summarize_manifest_reload(ctx.live_triggers, &collected_triggers);
1840 let connector_reload =
1841 connector_reload_fingerprint_map(ctx.live_triggers, &ctx.connectors.provider_overrides)
1842 != connector_reload_fingerprint_map(
1843 &collected_triggers,
1844 &extensions.provider_connectors,
1845 );
1846 let next_connector_runtime = if connector_reload {
1847 let mut runtime = initialize_connectors(
1848 &collected_triggers,
1849 ctx.event_log.clone(),
1850 ctx.secret_provider.clone(),
1851 ctx.metrics_registry.clone(),
1852 &extensions.provider_connectors,
1853 )
1854 .await?;
1855 runtime.activations = runtime
1856 .registry
1857 .activate_all(&runtime.trigger_registry)
1858 .await
1859 .map_err(|error| error.to_string())?;
1860 Some(runtime)
1861 } else {
1862 None
1863 };
1864 let previous_manifest = ctx.live_manifest.clone();
1865 let previous_triggers = ctx.live_triggers.clone();
1866 package::install_collected_manifest_triggers(&collected_triggers).await?;
1867 let binding_versions = live_manifest_binding_versions();
1868 let route_registry = next_connector_runtime
1869 .as_ref()
1870 .map(|runtime| &runtime.registry)
1871 .unwrap_or(&ctx.connectors.registry);
1872 let route_overrides = next_connector_runtime
1873 .as_ref()
1874 .map(|runtime| runtime.provider_overrides.as_slice())
1875 .unwrap_or(ctx.connectors.provider_overrides.as_slice());
1876 let route_configs = match build_route_configs(&collected_triggers, &binding_versions)
1877 .and_then(|routes| attach_route_connectors(routes, route_registry, route_overrides))
1878 {
1879 Ok(routes) => routes,
1880 Err(error) => {
1881 rollback_manifest_reload(ctx, &previous_manifest, &previous_triggers)
1882 .await
1883 .map_err(|rollback| format!("{error}; rollback failed: {rollback}"))?;
1884 return Err(error);
1885 }
1886 };
1887 if let Err(error) = ctx.listener.reload_routes(route_configs) {
1888 rollback_manifest_reload(ctx, &previous_manifest, &previous_triggers)
1889 .await
1890 .map_err(|rollback| format!("{error}; rollback failed: {rollback}"))?;
1891 return Err(error);
1892 }
1893 if let Some(runtime) = next_connector_runtime {
1894 let previous_handles = ctx.connectors.handles.clone();
1895 let connector_clients = runtime.registry.client_map().await;
1896 harn_vm::install_active_connector_clients(connector_clients);
1897 *ctx.connectors = runtime;
1898 for handle in previous_handles {
1899 let connector = handle.lock().await;
1900 if let Err(error) = connector.shutdown(Duration::from_secs(5)).await {
1901 eprintln!(
1902 "[harn] connector {} reload shutdown warning: {error}",
1903 connector.provider_id().as_str()
1904 );
1905 }
1906 }
1907 }
1908 *ctx.live_manifest = manifest;
1909 *ctx.live_triggers = collected_triggers;
1910 Ok(summary)
1911}
1912
1913#[cfg_attr(not(unix), allow(dead_code))]
1914async fn rollback_manifest_reload(
1915 ctx: &mut RuntimeCtx<'_>,
1916 previous_manifest: &Manifest,
1917 previous_triggers: &[CollectedManifestTrigger],
1918) -> Result<(), OrchestratorError> {
1919 package::install_collected_manifest_triggers(previous_triggers).await?;
1920 let binding_versions = live_manifest_binding_versions();
1921 let route_configs = build_route_configs(previous_triggers, &binding_versions)?;
1922 let route_configs = attach_route_connectors(
1923 route_configs,
1924 &ctx.connectors.registry,
1925 &ctx.connectors.provider_overrides,
1926 )?;
1927 ctx.listener.reload_routes(route_configs)?;
1928 *ctx.live_manifest = previous_manifest.clone();
1929 *ctx.live_triggers = previous_triggers.to_vec();
1930 Ok(())
1931}
1932
1933#[cfg_attr(not(unix), allow(dead_code))]
1934fn summarize_manifest_reload(
1935 current: &[CollectedManifestTrigger],
1936 next: &[CollectedManifestTrigger],
1937) -> ManifestReloadSummary {
1938 let current_map = trigger_fingerprint_map(current, true);
1939 let next_map = trigger_fingerprint_map(next, true);
1940 let mut summary = ManifestReloadSummary::default();
1941 let ids: BTreeSet<String> = current_map.keys().chain(next_map.keys()).cloned().collect();
1942 for id in ids {
1943 match (current_map.get(&id), next_map.get(&id)) {
1944 (None, Some(_)) => summary.added.push(id),
1945 (Some(_), None) => summary.removed.push(id),
1946 (Some(left), Some(right)) if left == right => summary.unchanged.push(id),
1947 (Some(_), Some(_)) => summary.modified.push(id),
1948 (None, None) => {}
1949 }
1950 }
1951 summary
1952}
1953
1954#[cfg_attr(not(unix), allow(dead_code))]
1955fn trigger_fingerprint_map(
1956 triggers: &[CollectedManifestTrigger],
1957 include_http_managed: bool,
1958) -> BTreeMap<String, String> {
1959 triggers
1960 .iter()
1961 .filter(|trigger| include_http_managed || !is_http_managed_trigger(trigger))
1962 .map(|trigger| {
1963 let spec = package::manifest_trigger_binding_spec(trigger.clone());
1964 (trigger.config.id.clone(), spec.definition_fingerprint)
1965 })
1966 .collect()
1967}
1968
1969#[cfg_attr(not(unix), allow(dead_code))]
1970fn connector_reload_fingerprint_map(
1971 triggers: &[CollectedManifestTrigger],
1972 provider_overrides: &[ResolvedProviderConnectorConfig],
1973) -> BTreeMap<String, Vec<String>> {
1974 let mut by_provider = BTreeMap::<String, Vec<String>>::new();
1975 for trigger in triggers {
1976 let provider = trigger.config.provider.as_str().to_string();
1977 if !connector_owns_ingress(&provider, provider_overrides)
1978 && matches!(
1979 trigger.config.kind,
1980 crate::package::TriggerKind::Webhook | crate::package::TriggerKind::A2aPush
1981 )
1982 {
1983 continue;
1984 }
1985 let spec = package::manifest_trigger_binding_spec(trigger.clone());
1986 by_provider
1987 .entry(provider)
1988 .or_default()
1989 .push(spec.definition_fingerprint);
1990 }
1991 for override_config in provider_overrides {
1992 by_provider
1993 .entry(override_config.id.as_str().to_string())
1994 .or_default()
1995 .push(provider_connector_fingerprint(override_config));
1996 }
1997 for fingerprints in by_provider.values_mut() {
1998 fingerprints.sort();
1999 }
2000 by_provider
2001}
2002
2003#[cfg_attr(not(unix), allow(dead_code))]
2004fn provider_connector_fingerprint(config: &ResolvedProviderConnectorConfig) -> String {
2005 match &config.connector {
2006 ResolvedProviderConnectorKind::RustBuiltin => format!(
2007 "{}::builtin@{}",
2008 config.id.as_str(),
2009 config.manifest_dir.display()
2010 ),
2011 ResolvedProviderConnectorKind::Harn { module } => format!(
2012 "{}::harn:{}@{}",
2013 config.id.as_str(),
2014 module,
2015 config.manifest_dir.display()
2016 ),
2017 ResolvedProviderConnectorKind::Invalid(message) => format!(
2018 "{}::invalid:{}@{}",
2019 config.id.as_str(),
2020 message,
2021 config.manifest_dir.display()
2022 ),
2023 }
2024}
2025
2026#[cfg_attr(not(unix), allow(dead_code))]
2027fn is_http_managed_trigger(trigger: &CollectedManifestTrigger) -> bool {
2028 matches!(
2029 trigger.config.kind,
2030 crate::package::TriggerKind::Webhook | crate::package::TriggerKind::A2aPush
2031 )
2032}
2033
2034async fn initialize_connectors(
2037 triggers: &[CollectedManifestTrigger],
2038 event_log: Arc<harn_vm::event_log::AnyEventLog>,
2039 secrets: Arc<dyn harn_vm::secrets::SecretProvider>,
2040 metrics: Arc<harn_vm::MetricsRegistry>,
2041 provider_overrides: &[ResolvedProviderConnectorConfig],
2042) -> Result<ConnectorRuntime, OrchestratorError> {
2043 let mut registry = harn_vm::ConnectorRegistry::default();
2044 let mut trigger_registry = harn_vm::TriggerRegistry::default();
2045 let mut grouped_kinds: BTreeMap<harn_vm::ProviderId, BTreeSet<String>> = BTreeMap::new();
2046
2047 for trigger in triggers {
2048 let binding = trigger_binding_for(&trigger.config)?;
2049 grouped_kinds
2050 .entry(binding.provider.clone())
2051 .or_default()
2052 .insert(binding.kind.as_str().to_string());
2053 trigger_registry.register(binding);
2054 }
2055
2056 let ctx = harn_vm::ConnectorCtx {
2057 inbox: Arc::new(
2058 harn_vm::InboxIndex::new(event_log.clone(), metrics.clone())
2059 .await
2060 .map_err(|error| error.to_string())?,
2061 ),
2062 event_log,
2063 secrets,
2064 metrics,
2065 rate_limiter: Arc::new(harn_vm::RateLimiterFactory::default()),
2066 };
2067
2068 let mut providers = Vec::new();
2069 let mut handles = Vec::new();
2070 for (provider, kinds) in grouped_kinds {
2071 let provider_name = provider.as_str().to_string();
2072 if let Some(connector) = connector_override_for(&provider, provider_overrides).await? {
2073 registry.remove(&provider);
2074 registry
2075 .register(connector)
2076 .map_err(|error| error.to_string())?;
2077 }
2078 if registry.get(&provider).is_none() {
2079 if provider_requires_harn_connector(provider.as_str()) {
2080 return Err(format!(
2081 "provider '{}' is package-backed; add [[providers]] id = \"{}\" with \
2082 connector = {{ harn = \"...\" }} to the manifest",
2083 provider.as_str(),
2084 provider.as_str()
2085 )
2086 .into());
2087 }
2088 let connector = connector_for(&provider, kinds);
2089 registry
2090 .register(connector)
2091 .map_err(|error| error.to_string())?;
2092 }
2093 let handle = registry
2094 .get(&provider)
2095 .ok_or_else(|| format!("connector registry lost provider '{}'", provider.as_str()))?;
2096 handle
2097 .lock()
2098 .await
2099 .init(ctx.clone())
2100 .await
2101 .map_err(|error| error.to_string())?;
2102 handles.push(handle.clone());
2103 providers.push(provider_name);
2104 }
2105
2106 Ok(ConnectorRuntime {
2107 registry,
2108 trigger_registry,
2109 handles,
2110 providers,
2111 activations: Vec::new(),
2112 provider_overrides: provider_overrides.to_vec(),
2113 })
2114}
2115
2116fn trigger_binding_for(
2117 config: &ResolvedTriggerConfig,
2118) -> Result<harn_vm::TriggerBinding, OrchestratorError> {
2119 Ok(harn_vm::TriggerBinding {
2120 provider: config.provider.clone(),
2121 kind: harn_vm::TriggerKind::from(trigger_kind_name(config.kind)),
2122 binding_id: config.id.clone(),
2123 dedupe_key: config.dedupe_key.clone(),
2124 dedupe_retention_days: config.retry.retention_days,
2125 config: connector_binding_config(config)?,
2126 })
2127}
2128
2129fn connector_binding_config(
2130 config: &ResolvedTriggerConfig,
2131) -> Result<JsonValue, OrchestratorError> {
2132 match config.kind {
2133 crate::package::TriggerKind::Cron => {
2134 serde_json::to_value(&config.kind_specific).map_err(|error| {
2135 OrchestratorError::Serve({
2136 format!(
2137 "failed to encode cron trigger config '{}': {error}",
2138 config.id
2139 )
2140 })
2141 })
2142 }
2143 crate::package::TriggerKind::Webhook => Ok(serde_json::json!({
2144 "match": config.match_,
2145 "secrets": config.secrets,
2146 "webhook": config.kind_specific,
2147 })),
2148 crate::package::TriggerKind::Poll => Ok(serde_json::json!({
2149 "match": config.match_,
2150 "secrets": config.secrets,
2151 "poll": config.kind_specific,
2152 })),
2153 crate::package::TriggerKind::Stream => Ok(serde_json::json!({
2154 "match": config.match_,
2155 "secrets": config.secrets,
2156 "stream": config.kind_specific,
2157 "window": config.window,
2158 })),
2159 crate::package::TriggerKind::A2aPush => Ok(serde_json::json!({
2160 "match": config.match_,
2161 "secrets": config.secrets,
2162 "a2a_push": a2a_push_connector_config(&config.kind_specific)?,
2163 })),
2164 _ => Ok(JsonValue::Null),
2165 }
2166}
2167
2168fn a2a_push_connector_config(
2169 kind_specific: &BTreeMap<String, toml::Value>,
2170) -> Result<JsonValue, OrchestratorError> {
2171 if let Some(nested) = kind_specific.get("a2a_push") {
2172 return serde_json::to_value(nested).map_err(|error| {
2173 OrchestratorError::Serve(format!("failed to encode a2a_push trigger config: {error}"))
2174 });
2175 }
2176 let filtered = kind_specific
2177 .iter()
2178 .filter(|(key, _)| key.as_str() != "path")
2179 .map(|(key, value)| (key.clone(), value.clone()))
2180 .collect::<BTreeMap<_, _>>();
2181 serde_json::to_value(filtered).map_err(|error| {
2182 OrchestratorError::Serve(format!("failed to encode a2a_push trigger config: {error}"))
2183 })
2184}
2185
2186fn connector_for(
2187 provider: &harn_vm::ProviderId,
2188 kinds: BTreeSet<String>,
2189) -> Box<dyn harn_vm::Connector> {
2190 match provider.as_str() {
2191 "cron" => Box::new(harn_vm::CronConnector::new()),
2192 _ => Box::new(PlaceholderConnector::new(provider.clone(), kinds)),
2193 }
2194}
2195
2196async fn connector_override_for(
2197 provider: &harn_vm::ProviderId,
2198 provider_overrides: &[ResolvedProviderConnectorConfig],
2199) -> Result<Option<Box<dyn harn_vm::Connector>>, OrchestratorError> {
2200 let Some(override_config) = provider_overrides
2201 .iter()
2202 .find(|entry| entry.id == *provider)
2203 else {
2204 return Ok(None);
2205 };
2206 match &override_config.connector {
2207 ResolvedProviderConnectorKind::RustBuiltin => Ok(None),
2208 ResolvedProviderConnectorKind::Invalid(message) => {
2209 Err(OrchestratorError::Serve(message.clone()))
2210 }
2211 ResolvedProviderConnectorKind::Harn { module } => {
2212 let module_path =
2213 harn_vm::resolve_module_import_path(&override_config.manifest_dir, module);
2214 let connector = harn_vm::HarnConnector::load(&module_path)
2215 .await
2216 .map_err(|error| {
2217 format!(
2218 "failed to load Harn connector '{}' for provider '{}': {error}",
2219 module_path.display(),
2220 provider.as_str()
2221 )
2222 })?;
2223 Ok(Some(Box::new(connector)))
2224 }
2225 }
2226}
2227
2228fn build_route_configs(
2229 triggers: &[CollectedManifestTrigger],
2230 binding_versions: &BTreeMap<String, u32>,
2231) -> Result<Vec<RouteConfig>, OrchestratorError> {
2232 let mut seen_paths = BTreeSet::new();
2233 let mut routes = Vec::new();
2234 for trigger in triggers {
2235 let Some(binding_version) = binding_versions.get(&trigger.config.id).copied() else {
2236 return Err(format!(
2237 "trigger registry is missing active manifest binding '{}'",
2238 trigger.config.id
2239 )
2240 .into());
2241 };
2242 if let Some(route) = RouteConfig::from_trigger(trigger, binding_version)? {
2243 if !seen_paths.insert(route.path.clone()) {
2244 return Err(format!(
2245 "trigger route '{}' is configured more than once",
2246 route.path
2247 )
2248 .into());
2249 }
2250 routes.push(route);
2251 }
2252 }
2253 Ok(routes)
2254}
2255
2256fn attach_route_connectors(
2257 routes: Vec<RouteConfig>,
2258 registry: &harn_vm::ConnectorRegistry,
2259 provider_overrides: &[ResolvedProviderConnectorConfig],
2260) -> Result<Vec<RouteConfig>, OrchestratorError> {
2261 routes
2262 .into_iter()
2263 .map(|mut route| {
2264 if route.connector_ingress
2265 || connector_owns_ingress(route.provider.as_str(), provider_overrides)
2266 {
2267 route.connector = Some(registry.get(&route.provider).ok_or_else(|| {
2268 format!(
2269 "connector registry is missing provider '{}'",
2270 route.provider.as_str()
2271 )
2272 })?);
2273 }
2274 Ok(route)
2275 })
2276 .collect()
2277}
2278
2279fn connector_owns_ingress(
2280 provider: &str,
2281 provider_overrides: &[ResolvedProviderConnectorConfig],
2282) -> bool {
2283 provider_overrides.iter().any(|entry| {
2284 entry.id.as_str() == provider
2285 && matches!(entry.connector, ResolvedProviderConnectorKind::Harn { .. })
2286 })
2287}
2288
2289fn provider_requires_harn_connector(provider: &str) -> bool {
2290 harn_vm::provider_metadata(provider).is_some_and(|metadata| {
2291 matches!(
2292 metadata.runtime,
2293 harn_vm::ProviderRuntimeMetadata::Placeholder
2294 )
2295 })
2296}
2297
2298fn live_manifest_binding_versions() -> BTreeMap<String, u32> {
2299 let mut versions = BTreeMap::new();
2300 for binding in harn_vm::snapshot_trigger_bindings() {
2301 if binding.source != harn_vm::TriggerBindingSource::Manifest {
2302 continue;
2303 }
2304 if binding.state == harn_vm::TriggerState::Terminated {
2305 continue;
2306 }
2307 versions
2308 .entry(binding.id)
2309 .and_modify(|current: &mut u32| *current = (*current).max(binding.version))
2310 .or_insert(binding.version);
2311 }
2312 versions
2313}
2314
2315fn trigger_state_snapshots(
2316 triggers: &[CollectedManifestTrigger],
2317 listener_metrics: &BTreeMap<String, TriggerMetricSnapshot>,
2318) -> Vec<TriggerStateSnapshot> {
2319 let bindings_by_id = harn_vm::snapshot_trigger_bindings()
2320 .into_iter()
2321 .filter(|binding| binding.source == harn_vm::TriggerBindingSource::Manifest)
2322 .fold(
2323 BTreeMap::<String, harn_vm::TriggerBindingSnapshot>::new(),
2324 |mut acc, binding| {
2325 match acc.get(&binding.id) {
2326 Some(current) if current.version >= binding.version => {}
2327 _ => {
2328 acc.insert(binding.id.clone(), binding);
2329 }
2330 }
2331 acc
2332 },
2333 );
2334
2335 triggers
2336 .iter()
2337 .map(|trigger| {
2338 let runtime = bindings_by_id.get(&trigger.config.id);
2339 let metrics = listener_metrics.get(&trigger.config.id);
2340 TriggerStateSnapshot {
2341 id: trigger.config.id.clone(),
2342 provider: trigger.config.provider.as_str().to_string(),
2343 kind: trigger_kind_name(trigger.config.kind).to_string(),
2344 handler: handler_kind(&trigger.handler).to_string(),
2345 version: runtime.map(|binding| binding.version),
2346 state: runtime.map(|binding| binding.state.as_str().to_string()),
2347 received: metrics.map(|value| value.received).unwrap_or(0),
2348 dispatched: metrics.map(|value| value.dispatched).unwrap_or(0),
2349 failed: metrics.map(|value| value.failed).unwrap_or(0),
2350 in_flight: metrics.map(|value| value.in_flight).unwrap_or(0),
2351 }
2352 })
2353 .collect()
2354}
2355
2356fn format_trigger_summary(triggers: &[CollectedManifestTrigger]) -> String {
2359 if triggers.is_empty() {
2360 return "none".to_string();
2361 }
2362 triggers
2363 .iter()
2364 .map(|trigger| {
2365 format!(
2366 "{} [{}:{} -> {}]",
2367 trigger.config.id,
2368 trigger.config.provider.as_str(),
2369 trigger_kind_name(trigger.config.kind),
2370 handler_kind(&trigger.handler)
2371 )
2372 })
2373 .collect::<Vec<_>>()
2374 .join(", ")
2375}
2376
2377fn format_activation_summary(activations: &[harn_vm::ActivationHandle]) -> String {
2378 if activations.is_empty() {
2379 return "none".to_string();
2380 }
2381 activations
2382 .iter()
2383 .map(|activation| {
2384 format!(
2385 "{}({})",
2386 activation.provider.as_str(),
2387 activation.binding_count
2388 )
2389 })
2390 .collect::<Vec<_>>()
2391 .join(", ")
2392}
2393
2394fn handler_kind(handler: &CollectedTriggerHandler) -> &'static str {
2395 match handler {
2396 CollectedTriggerHandler::Local { .. } => "local",
2397 CollectedTriggerHandler::A2a { .. } => "a2a",
2398 CollectedTriggerHandler::Worker { .. } => "worker",
2399 CollectedTriggerHandler::Persona { .. } => "persona",
2400 }
2401}
2402
2403fn trigger_kind_name(kind: crate::package::TriggerKind) -> &'static str {
2404 match kind {
2405 crate::package::TriggerKind::Webhook => "webhook",
2406 crate::package::TriggerKind::Cron => "cron",
2407 crate::package::TriggerKind::Poll => "poll",
2408 crate::package::TriggerKind::Stream => "stream",
2409 crate::package::TriggerKind::Predicate => "predicate",
2410 crate::package::TriggerKind::A2aPush => "a2a-push",
2411 }
2412}
2413
2414fn has_orchestrator_api_keys_configured() -> bool {
2415 std::env::var("HARN_ORCHESTRATOR_API_KEYS")
2416 .ok()
2417 .is_some_and(|value| value.split(',').any(|segment| !segment.trim().is_empty()))
2418}
2419
2420fn has_mcp_oauth_configured() -> bool {
2421 std::env::var("HARN_MCP_OAUTH_AUTHORIZATION_SERVERS")
2422 .ok()
2423 .is_some_and(|value| value.split(',').any(|segment| !segment.trim().is_empty()))
2424}
2425
2426fn validate_mcp_paths(
2427 path: &str,
2428 sse_path: &str,
2429 messages_path: &str,
2430) -> Result<(), OrchestratorError> {
2431 let reserved = [
2432 "/health",
2433 "/healthz",
2434 "/readyz",
2435 "/metrics",
2436 "/admin/reload",
2437 "/acp",
2438 ];
2439 let mut seen = BTreeSet::new();
2440 for (label, value) in [
2441 ("--mcp-path", path),
2442 ("--mcp-sse-path", sse_path),
2443 ("--mcp-messages-path", messages_path),
2444 ] {
2445 if !value.starts_with('/') {
2446 return Err(format!("{label} must start with '/'").into());
2447 }
2448 if value == "/" {
2449 return Err(format!("{label} cannot be '/'").into());
2450 }
2451 if reserved.contains(&value) {
2452 return Err(format!("{label} cannot use reserved listener path '{value}'").into());
2453 }
2454 if !seen.insert(value) {
2455 return Err(format!("embedded MCP paths must be unique; duplicate '{value}'").into());
2456 }
2457 }
2458 Ok(())
2459}
2460
2461async fn append_lifecycle_event(
2462 log: &Arc<AnyEventLog>,
2463 kind: &str,
2464 payload: JsonValue,
2465) -> Result<(), OrchestratorError> {
2466 let topic =
2467 harn_vm::event_log::Topic::new(LIFECYCLE_TOPIC).map_err(|error| error.to_string())?;
2468 log.append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
2469 .await
2470 .map(|_| ())
2471 .map_err(|error| {
2472 OrchestratorError::Serve(format!(
2473 "failed to append orchestrator lifecycle event: {error}"
2474 ))
2475 })
2476}
2477
2478async fn append_pump_lifecycle_event(
2479 log: &Arc<AnyEventLog>,
2480 kind: &str,
2481 payload: JsonValue,
2482) -> Result<(), OrchestratorError> {
2483 append_lifecycle_event(log, kind, payload).await
2484}
2485
2486async fn wait_for_pump_drain_release(
2487 log: &Arc<AnyEventLog>,
2488 topic: &harn_vm::event_log::Topic,
2489 event_id: u64,
2490 gate_rx: &mut watch::Receiver<bool>,
2491) -> Result<(), OrchestratorError> {
2492 if !*gate_rx.borrow() {
2493 return Ok(());
2494 }
2495 append_pump_lifecycle_event(
2496 log,
2497 "pump_drain_waiting",
2498 json!({
2499 "topic": topic.as_str(),
2500 "event_log_id": event_id,
2501 }),
2502 )
2503 .await?;
2504 while *gate_rx.borrow() {
2505 if gate_rx.changed().await.is_err() {
2506 break;
2507 }
2508 }
2509 Ok(())
2510}
2511
2512#[cfg_attr(not(unix), allow(dead_code))]
2513async fn append_manifest_event(
2514 log: &Arc<AnyEventLog>,
2515 kind: &str,
2516 payload: JsonValue,
2517) -> Result<(), OrchestratorError> {
2518 let topic =
2519 harn_vm::event_log::Topic::new(MANIFEST_TOPIC).map_err(|error| error.to_string())?;
2520 log.append(&topic, harn_vm::event_log::LogEvent::new(kind, payload))
2521 .await
2522 .map(|_| ())
2523 .map_err(|error| {
2524 OrchestratorError::Serve(format!(
2525 "failed to append orchestrator manifest event: {error}"
2526 ))
2527 })
2528}
2529
2530async fn record_pump_metrics(
2531 metrics: &harn_vm::MetricsRegistry,
2532 log: &Arc<AnyEventLog>,
2533 topic: &harn_vm::event_log::Topic,
2534 last_seen: u64,
2535 outstanding: usize,
2536) -> Result<(), OrchestratorError> {
2537 let latest = log.latest(topic).await.ok().flatten().unwrap_or(last_seen);
2538 let backlog = latest.saturating_sub(last_seen);
2539 metrics.set_orchestrator_pump_outstanding(topic.as_str(), outstanding);
2540 metrics.set_orchestrator_pump_backlog(topic.as_str(), backlog);
2541 append_pump_lifecycle_event(
2542 log,
2543 "pump_metrics_recorded",
2544 json!({
2545 "topic": topic.as_str(),
2546 "latest": latest,
2547 "last_seen": last_seen,
2548 "backlog": backlog,
2549 "outstanding": outstanding,
2550 }),
2551 )
2552 .await
2553}
2554
2555fn admission_delay(occurred_at_ms: i64) -> Duration {
2556 let now = std::time::SystemTime::now()
2557 .duration_since(std::time::UNIX_EPOCH)
2558 .unwrap_or_default()
2559 .as_millis() as i64;
2560 Duration::from_millis(now.saturating_sub(occurred_at_ms).max(0) as u64)
2561}
2562
2563async fn emit_drain_truncated(
2564 log: &Arc<AnyEventLog>,
2565 topic_name: &str,
2566 report: PumpDrainReport,
2567 config: DrainConfig,
2568) -> Result<(), OrchestratorError> {
2569 if !report.truncated() {
2570 return Ok(());
2571 }
2572 eprintln!(
2573 "[harn] warning: pump drain truncated for {topic_name}: remaining_queued={} drain_items={} reason={}",
2574 report.remaining_queued,
2575 report.drain_items,
2576 report.stop_reason.as_str()
2577 );
2578 append_lifecycle_event(
2579 log,
2580 "drain_truncated",
2581 json!({
2582 "topic": topic_name,
2583 "remaining_queued": report.remaining_queued,
2584 "drain_items": report.drain_items,
2585 "outstanding_tasks": report.outstanding_tasks,
2586 "max_items": config.max_items,
2587 "deadline_secs": config.deadline.as_secs(),
2588 "reason": report.stop_reason.as_str(),
2589 }),
2590 )
2591 .await
2592}
2593
2594async fn topic_latest_id(
2595 log: &Arc<AnyEventLog>,
2596 topic_name: &str,
2597) -> Result<u64, OrchestratorError> {
2598 let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
2599 log.latest(&topic)
2600 .await
2601 .map(|value| value.unwrap_or(0))
2602 .map_err(|error| {
2603 OrchestratorError::Serve(format!(
2604 "failed to read topic head for {topic_name}: {error}"
2605 ))
2606 })
2607}
2608
2609async fn drain_pump_best_effort(
2610 log: &Arc<AnyEventLog>,
2611 topic_name: &str,
2612 pump: PumpHandle,
2613 config: DrainConfig,
2614 overall_deadline: tokio::time::Instant,
2615) -> Result<PumpDrainReport, OrchestratorError> {
2616 let topic = harn_vm::event_log::Topic::new(topic_name).map_err(|error| error.to_string())?;
2617 let consumer = pump_consumer_id(&topic)?;
2618 let start_seen = log
2619 .consumer_cursor(&topic, &consumer)
2620 .await
2621 .map_err(|error| format!("failed to read consumer cursor for {topic_name}: {error}"))?
2622 .unwrap_or(0);
2623 let up_to = log
2624 .latest(&topic)
2625 .await
2626 .map_err(|error| format!("failed to read topic head for {topic_name}: {error}"))?
2627 .unwrap_or(0);
2628 let budget = remaining_budget(overall_deadline);
2629
2630 match tokio::time::timeout(
2631 budget,
2632 pump.drain(log, topic_name, up_to, config, overall_deadline),
2633 )
2634 .await
2635 {
2636 Ok(Ok(report)) => Ok(report),
2637 Ok(Err(error)) => {
2638 eprintln!("[harn] warning: pump drain error for {topic_name}: {error}");
2639 best_effort_pump_report(
2640 log,
2641 &topic,
2642 &consumer,
2643 start_seen,
2644 up_to,
2645 PumpDrainStopReason::Error,
2646 )
2647 .await
2648 }
2649 Err(_) => {
2650 eprintln!(
2651 "[harn] warning: pump drain timed out for {topic_name} after {:?}",
2652 budget
2653 );
2654 best_effort_pump_report(
2655 log,
2656 &topic,
2657 &consumer,
2658 start_seen,
2659 up_to,
2660 PumpDrainStopReason::Deadline,
2661 )
2662 .await
2663 }
2664 }
2665}
2666
2667async fn best_effort_pump_report(
2668 log: &Arc<AnyEventLog>,
2669 topic: &harn_vm::event_log::Topic,
2670 consumer: &ConsumerId,
2671 start_seen: u64,
2672 up_to: u64,
2673 stop_reason: PumpDrainStopReason,
2674) -> Result<PumpDrainReport, OrchestratorError> {
2675 let last_seen = log
2676 .consumer_cursor(topic, consumer)
2677 .await
2678 .map_err(|error| format!("failed to read consumer cursor for {topic}: {error}"))?
2679 .unwrap_or(start_seen);
2680 let stats = PumpStats {
2681 last_seen,
2682 processed: last_seen.saturating_sub(start_seen),
2683 };
2684 Ok(pump_drain_report(stats, start_seen, up_to, 0, stop_reason))
2685}
2686
2687fn remaining_budget(deadline: tokio::time::Instant) -> Duration {
2688 deadline.saturating_duration_since(tokio::time::Instant::now())
2689}
2690
2691fn maybe_finish_pump_drain(
2692 stats: PumpStats,
2693 progress: PumpDrainProgress,
2694 outstanding_tasks: usize,
2695) -> Option<PumpDrainReport> {
2696 if stats.last_seen >= progress.request.up_to && outstanding_tasks == 0 {
2697 return Some(pump_drain_report(
2698 stats,
2699 progress.start_seen,
2700 progress.request.up_to,
2701 outstanding_tasks,
2702 PumpDrainStopReason::Drained,
2703 ));
2704 }
2705 if outstanding_tasks > 0 {
2706 if tokio::time::Instant::now() >= progress.request.deadline {
2707 return Some(pump_drain_report(
2708 stats,
2709 progress.start_seen,
2710 progress.request.up_to,
2711 outstanding_tasks,
2712 PumpDrainStopReason::Deadline,
2713 ));
2714 }
2715 return None;
2716 }
2717 let drain_items = stats.last_seen.saturating_sub(progress.start_seen);
2718 if drain_items >= progress.request.config.max_items as u64 {
2719 return Some(pump_drain_report(
2720 stats,
2721 progress.start_seen,
2722 progress.request.up_to,
2723 outstanding_tasks,
2724 PumpDrainStopReason::MaxItems,
2725 ));
2726 }
2727 if tokio::time::Instant::now() >= progress.request.deadline {
2728 return Some(pump_drain_report(
2729 stats,
2730 progress.start_seen,
2731 progress.request.up_to,
2732 outstanding_tasks,
2733 PumpDrainStopReason::Deadline,
2734 ));
2735 }
2736 None
2737}
2738
2739fn pump_drain_report(
2740 stats: PumpStats,
2741 start_seen: u64,
2742 up_to: u64,
2743 outstanding_tasks: usize,
2744 stop_reason: PumpDrainStopReason,
2745) -> PumpDrainReport {
2746 PumpDrainReport {
2747 stats,
2748 drain_items: stats.last_seen.saturating_sub(start_seen),
2749 remaining_queued: up_to.saturating_sub(stats.last_seen),
2750 outstanding_tasks,
2751 stop_reason,
2752 }
2753}
2754
2755fn pump_consumer_id(topic: &harn_vm::event_log::Topic) -> Result<ConsumerId, OrchestratorError> {
2756 ConsumerId::new(format!("orchestrator-pump.{}", topic.as_str())).map_err(|error| {
2757 OrchestratorError::Serve(format!("failed to create consumer id for {topic}: {error}"))
2758 })
2759}
2760
2761fn inbox_task_test_release_file() -> Option<PathBuf> {
2762 test_file_from_env(TEST_INBOX_TASK_RELEASE_FILE_ENV)
2763}
2764
2765fn test_file_from_env(key: &str) -> Option<PathBuf> {
2766 std::env::var_os(key)
2767 .filter(|value| !value.is_empty())
2768 .map(PathBuf::from)
2769}
2770
2771async fn wait_for_test_release_file(path: &Path) {
2772 while tokio::fs::metadata(path).await.is_err() {
2773 tokio::time::sleep(Duration::from_millis(10)).await;
2774 }
2775}
2776
2777fn pending_pump_test_should_fail() -> bool {
2778 std::env::var(TEST_FAIL_PENDING_PUMP_ENV)
2779 .ok()
2780 .is_some_and(|value| value != "0")
2781}
2782
2783fn spawn_manifest_watcher(
2784 config_path: PathBuf,
2785 reload: AdminReloadHandle,
2786) -> Result<notify::RecommendedWatcher, OrchestratorError> {
2787 use notify::{Event, EventKind, RecursiveMode, Watcher};
2788
2789 let watch_dir = config_path.parent().ok_or_else(|| {
2790 format!(
2791 "manifest has no parent directory: {}",
2792 config_path.display()
2793 )
2794 })?;
2795 let target_name = config_path
2796 .file_name()
2797 .and_then(|name| name.to_str())
2798 .ok_or_else(|| {
2799 format!(
2800 "manifest path is not valid UTF-8: {}",
2801 config_path.display()
2802 )
2803 })?
2804 .to_string();
2805 let (tx, mut rx) = mpsc::unbounded_channel::<()>();
2806 tokio::task::spawn_local(async move {
2807 while rx.recv().await.is_some() {
2808 tokio::time::sleep(Duration::from_millis(200)).await;
2809 while rx.try_recv().is_ok() {}
2810 let _ = reload.trigger("file_watch");
2811 }
2812 });
2813 let mut watcher =
2814 notify::recommended_watcher(move |res: Result<Event, notify::Error>| match res {
2815 Ok(event)
2816 if matches!(
2817 event.kind,
2818 EventKind::Modify(_)
2819 | EventKind::Create(_)
2820 | EventKind::Remove(_)
2821 | EventKind::Any
2822 ) && event.paths.iter().any(|path| {
2823 path.file_name()
2824 .and_then(|name| name.to_str())
2825 .is_some_and(|name| name == target_name)
2826 }) =>
2827 {
2828 let _ = tx.send(());
2829 }
2830 _ => {}
2831 })
2832 .map_err(|error| format!("failed to create manifest watcher: {error}"))?;
2833 watcher
2834 .watch(watch_dir, RecursiveMode::NonRecursive)
2835 .map_err(|error| {
2836 format!(
2837 "failed to watch manifest directory {}: {error}",
2838 watch_dir.display()
2839 )
2840 })?;
2841 Ok(watcher)
2842}
2843
2844pub(crate) fn load_manifest(config_path: &Path) -> Result<(Manifest, PathBuf), OrchestratorError> {
2845 if !config_path.is_file() {
2846 return Err(format!("manifest not found: {}", config_path.display()).into());
2847 }
2848 let content = std::fs::read_to_string(config_path)
2849 .map_err(|error| format!("failed to read {}: {error}", config_path.display()))?;
2850 let manifest = toml::from_str::<Manifest>(&content)
2851 .map_err(|error| format!("failed to parse {}: {error}", config_path.display()))?;
2852 let manifest_dir = config_path.parent().map(Path::to_path_buf).ok_or_else(|| {
2853 format!(
2854 "manifest has no parent directory: {}",
2855 config_path.display()
2856 )
2857 })?;
2858 Ok((manifest, manifest_dir))
2859}
2860
2861pub(crate) fn absolutize_from_cwd(path: &Path) -> Result<PathBuf, OrchestratorError> {
2862 let candidate = if path.is_absolute() {
2863 path.to_path_buf()
2864 } else {
2865 std::env::current_dir()
2866 .map_err(|error| format!("failed to read current directory: {error}"))?
2867 .join(path)
2868 };
2869 Ok(candidate)
2870}
2871
2872fn configured_secret_chain_display() -> String {
2873 std::env::var(harn_vm::secrets::SECRET_PROVIDER_CHAIN_ENV)
2874 .unwrap_or_else(|_| harn_vm::secrets::DEFAULT_SECRET_PROVIDER_CHAIN.to_string())
2875 .split(',')
2876 .map(str::trim)
2877 .filter(|segment| !segment.is_empty())
2878 .collect::<Vec<_>>()
2879 .join(" -> ")
2880}
2881
2882fn secret_namespace_for(manifest_dir: &Path) -> String {
2883 match std::env::var("HARN_SECRET_NAMESPACE") {
2884 Ok(namespace) if !namespace.trim().is_empty() => namespace,
2885 _ => {
2886 let leaf = manifest_dir
2887 .file_name()
2888 .and_then(|name| name.to_str())
2889 .filter(|name| !name.is_empty())
2890 .unwrap_or("workspace");
2891 format!("harn/{leaf}")
2892 }
2893 }
2894}
2895
2896fn now_rfc3339() -> Result<String, OrchestratorError> {
2897 OffsetDateTime::now_utc()
2898 .format(&Rfc3339)
2899 .map_err(|error| OrchestratorError::Serve(format!("failed to format timestamp: {error}")))
2900}
2901
2902fn write_state_snapshot(
2903 path: &Path,
2904 snapshot: &ServeStateSnapshot,
2905) -> Result<(), OrchestratorError> {
2906 let encoded = serde_json::to_vec_pretty(snapshot)
2907 .map_err(|error| format!("failed to encode orchestrator state snapshot: {error}"))?;
2908 if let Some(parent) = path.parent() {
2909 std::fs::create_dir_all(parent)
2910 .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
2911 }
2912 std::fs::write(path, encoded).map_err(|error| {
2913 OrchestratorError::Serve(format!("failed to write {}: {error}", path.display()))
2914 })
2915}
2916
2917#[derive(Debug, Serialize)]
2920struct ServeStateSnapshot {
2921 status: String,
2922 role: String,
2923 bind: String,
2924 listener_url: String,
2925 manifest_path: String,
2926 state_dir: String,
2927 started_at: String,
2928 stopped_at: Option<String>,
2929 secret_provider_chain: String,
2930 event_log_backend: String,
2931 event_log_location: Option<String>,
2932 triggers: Vec<TriggerStateSnapshot>,
2933 connectors: Vec<String>,
2934 activations: Vec<ConnectorActivationSnapshot>,
2935}
2936
2937#[derive(Debug, Serialize)]
2938struct TriggerStateSnapshot {
2939 id: String,
2940 provider: String,
2941 kind: String,
2942 handler: String,
2943 version: Option<u32>,
2944 state: Option<String>,
2945 received: u64,
2946 dispatched: u64,
2947 failed: u64,
2948 in_flight: u64,
2949}
2950
2951#[derive(Debug, Serialize)]
2952struct ConnectorActivationSnapshot {
2953 provider: String,
2954 binding_count: usize,
2955}
2956
2957struct PlaceholderConnector {
2960 provider_id: harn_vm::ProviderId,
2961 kinds: Vec<harn_vm::TriggerKind>,
2962 _ctx: Option<harn_vm::ConnectorCtx>,
2963}
2964
2965impl PlaceholderConnector {
2966 fn new(provider_id: harn_vm::ProviderId, kinds: BTreeSet<String>) -> Self {
2967 Self {
2968 provider_id,
2969 kinds: kinds.into_iter().map(harn_vm::TriggerKind::from).collect(),
2970 _ctx: None,
2971 }
2972 }
2973}
2974
2975struct PlaceholderClient;
2976
2977#[async_trait]
2978impl harn_vm::ConnectorClient for PlaceholderClient {
2979 async fn call(
2980 &self,
2981 method: &str,
2982 _args: JsonValue,
2983 ) -> Result<JsonValue, harn_vm::ClientError> {
2984 Err(harn_vm::ClientError::Other(format!(
2985 "connector client method '{method}' is not implemented in the orchestrator scaffold"
2986 )))
2987 }
2988}
2989
2990#[async_trait]
2991impl harn_vm::Connector for PlaceholderConnector {
2992 fn provider_id(&self) -> &harn_vm::ProviderId {
2993 &self.provider_id
2994 }
2995
2996 fn kinds(&self) -> &[harn_vm::TriggerKind] {
2997 &self.kinds
2998 }
2999
3000 async fn init(&mut self, ctx: harn_vm::ConnectorCtx) -> Result<(), harn_vm::ConnectorError> {
3001 self._ctx = Some(ctx);
3002 Ok(())
3003 }
3004
3005 async fn activate(
3006 &self,
3007 bindings: &[harn_vm::TriggerBinding],
3008 ) -> Result<harn_vm::ActivationHandle, harn_vm::ConnectorError> {
3009 Ok(harn_vm::ActivationHandle::new(
3010 self.provider_id.clone(),
3011 bindings.len(),
3012 ))
3013 }
3014
3015 async fn normalize_inbound(
3016 &self,
3017 _raw: harn_vm::RawInbound,
3018 ) -> Result<harn_vm::TriggerEvent, harn_vm::ConnectorError> {
3019 Err(harn_vm::ConnectorError::Unsupported(format!(
3020 "connector '{}' inbound normalization is not implemented yet",
3021 self.provider_id.as_str()
3022 )))
3023 }
3024
3025 fn payload_schema(&self) -> harn_vm::ProviderPayloadSchema {
3026 harn_vm::ProviderPayloadSchema::named("TriggerEvent")
3027 }
3028
3029 fn client(&self) -> Arc<dyn harn_vm::ConnectorClient> {
3030 Arc::new(PlaceholderClient)
3031 }
3032}
3033
3034#[cfg(test)]
3035mod tests {
3036 use super::*;
3037 use futures::StreamExt;
3038 use harn_vm::event_log::{EventLog, Topic};
3039
3040 fn write_test_file(dir: &Path, relative: &str, contents: &str) {
3041 let path = dir.join(relative);
3042 if let Some(parent) = path.parent() {
3043 std::fs::create_dir_all(parent).unwrap();
3044 }
3045 std::fs::write(path, contents).unwrap();
3046 }
3047
3048 fn stream_manifest_fixture() -> &'static str {
3049 r#"
3050[package]
3051name = "fixture"
3052
3053[exports]
3054handlers = "lib.harn"
3055
3056[[triggers]]
3057id = "ws-stream"
3058kind = "stream"
3059provider = "websocket"
3060path = "/streams/ws"
3061match = { events = ["quote.tick"] }
3062handler = "handlers::on_stream"
3063"#
3064 }
3065
3066 fn stream_handler_fixture(marker_path: &Path) -> String {
3067 format!(
3068 r#"
3069import "std/triggers"
3070
3071pub fn on_stream(event: TriggerEvent) {{
3072 write_file({marker:?}, json_stringify({{
3073 provider: event.provider,
3074 kind: event.kind,
3075 key: event.provider_payload.key,
3076 stream: event.provider_payload.stream,
3077 amount: event.provider_payload.raw.value.amount,
3078 }}))
3079}}
3080"#,
3081 marker = marker_path.display().to_string()
3082 )
3083 }
3084
3085 #[tokio::test(flavor = "multi_thread")]
3089 async fn stream_trigger_route_uses_generic_stream_connector_in_process() {
3090 let _env_lock = crate::tests::common::env_lock::lock_env().lock().await;
3093 let _secret_providers = crate::env_guard::ScopedEnvVar::set("HARN_SECRET_PROVIDERS", "env");
3094
3095 let temp = tempfile::TempDir::new().unwrap();
3096 let marker_path = temp.path().join("stream-handler.json");
3097 write_test_file(temp.path(), "harn.toml", stream_manifest_fixture());
3098 write_test_file(
3099 temp.path(),
3100 "lib.harn",
3101 &stream_handler_fixture(&marker_path),
3102 );
3103
3104 let config =
3105 OrchestratorConfig::for_test(temp.path().join("harn.toml"), temp.path().join("state"));
3106 let harness = OrchestratorHarness::start(config)
3107 .await
3108 .expect("harness start");
3109 let base_url = harness.listener_url().to_string();
3110 let event_log = harness.event_log();
3111
3112 let response = reqwest::Client::new()
3113 .post(format!("{base_url}/streams/ws"))
3114 .header("content-type", "application/json")
3115 .json(&serde_json::json!({
3116 "key": "acct-1",
3117 "stream": "quotes",
3118 "value": {"amount": 10}
3119 }))
3120 .send()
3121 .await
3122 .unwrap();
3123 assert_eq!(response.status(), 200);
3124
3125 let topic = Topic::new("orchestrator.lifecycle").unwrap();
3128 let mut stream = event_log.clone().subscribe(&topic, None).await.unwrap();
3129 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30);
3130 loop {
3131 let remaining = deadline
3132 .checked_duration_since(tokio::time::Instant::now())
3133 .expect("timed out waiting for pump_dispatch_completed");
3134 let (_, event) = tokio::time::timeout(remaining, stream.next())
3135 .await
3136 .expect("timed out waiting for pump_dispatch_completed event")
3137 .expect("event stream ended unexpectedly")
3138 .expect("event stream error");
3139 if event.kind == "pump_dispatch_completed"
3140 && event.payload["status"] == serde_json::json!("completed")
3141 {
3142 break;
3143 }
3144 }
3145 drop(stream);
3146
3147 let marker: serde_json::Value =
3148 serde_json::from_str(&std::fs::read_to_string(&marker_path).unwrap()).unwrap();
3149 assert_eq!(
3150 marker.get("provider").and_then(|v| v.as_str()),
3151 Some("websocket")
3152 );
3153 assert_eq!(
3154 marker.get("kind").and_then(|v| v.as_str()),
3155 Some("quote.tick")
3156 );
3157 assert_eq!(marker.get("key").and_then(|v| v.as_str()), Some("acct-1"));
3158 assert_eq!(
3159 marker.get("stream").and_then(|v| v.as_str()),
3160 Some("quotes")
3161 );
3162 assert_eq!(marker.get("amount").and_then(|v| v.as_i64()), Some(10));
3163
3164 harness
3165 .shutdown(std::time::Duration::from_secs(5))
3166 .await
3167 .expect("harness shutdown");
3168 }
3169}