use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{oneshot, watch};
use harn_vm::event_log::AnyEventLog;
use super::errors::OrchestratorError;
use super::listener::AdminReloadHandle;
mod audit;
mod config;
mod lifecycle;
mod pumps;
mod reload;
mod routing;
mod shutdown;
pub use config::{DrainConfig, HarnessError, OrchestratorConfig, PumpConfig, ShutdownReport};
pub(crate) use lifecycle::{absolutize_from_cwd, load_manifest};
const LIFECYCLE_TOPIC: &str = "orchestrator.lifecycle";
#[cfg_attr(not(unix), allow(dead_code))]
const MANIFEST_TOPIC: &str = "orchestrator.manifest";
const STATE_SNAPSHOT_FILE: &str = "orchestrator-state.json";
const PENDING_TOPIC: &str = "orchestrator.triggers.pending";
const CRON_TICK_TOPIC: &str = "connectors.cron.tick";
const TEST_INBOX_TASK_RELEASE_FILE_ENV: &str = "HARN_TEST_ORCHESTRATOR_INBOX_TASK_RELEASE_FILE";
const TEST_FAIL_PENDING_PUMP_ENV: &str = "HARN_TEST_ORCHESTRATOR_FAIL_PENDING_PUMP";
#[allow(dead_code)]
pub struct OrchestratorHarness {
event_log: Arc<AnyEventLog>,
listener_url: String,
local_addr: SocketAddr,
state_dir: PathBuf,
admin_reload: AdminReloadHandle,
shutdown_tx: Arc<watch::Sender<bool>>,
pump_drain_gate: pumps::PumpDrainGate,
join: Option<std::thread::JoinHandle<()>>,
}
pub(super) struct ReadyState {
pub(super) event_log: Arc<AnyEventLog>,
pub(super) listener_url: String,
pub(super) local_addr: SocketAddr,
pub(super) state_dir: PathBuf,
pub(super) admin_reload: AdminReloadHandle,
}
#[allow(dead_code)]
impl OrchestratorHarness {
pub async fn start(config: OrchestratorConfig) -> Result<Self, HarnessError> {
let (ready_tx, ready_rx) = oneshot::channel::<Result<ReadyState, OrchestratorError>>();
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let shutdown_tx = Arc::new(shutdown_tx);
let pump_drain_gate = pumps::PumpDrainGate::new();
let task_pump_drain_gate = pump_drain_gate.clone();
let join = std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.expect("failed to build OrchestratorHarness tokio runtime");
let local = tokio::task::LocalSet::new();
rt.block_on(local.run_until(lifecycle::orchestrator_task(
config,
ready_tx,
shutdown_rx,
task_pump_drain_gate,
)));
});
match ready_rx.await {
Ok(Ok(ready)) => Ok(Self {
event_log: ready.event_log,
listener_url: ready.listener_url,
local_addr: ready.local_addr,
state_dir: ready.state_dir,
admin_reload: ready.admin_reload,
shutdown_tx,
pump_drain_gate,
join: Some(join),
}),
Ok(Err(error)) => {
let _ = join.join();
Err(HarnessError::from(error))
}
Err(_) => {
let _ = join.join();
Err(HarnessError(
"harness thread exited before signaling readiness".to_string(),
))
}
}
}
pub fn listener_url(&self) -> &str {
&self.listener_url
}
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
pub fn event_log(&self) -> Arc<AnyEventLog> {
self.event_log.clone()
}
pub fn state_dir(&self) -> &Path {
&self.state_dir
}
pub fn admin_reload(&self) -> AdminReloadHandle {
self.admin_reload.clone()
}
pub fn shutdown_trigger(&self) -> Arc<watch::Sender<bool>> {
self.shutdown_tx.clone()
}
pub fn pause_pump_drain(&self) {
self.pump_drain_gate.pause();
}
pub fn release_pump_drain(&self) {
self.pump_drain_gate.release();
}
pub async fn shutdown(mut self, _deadline: Duration) -> Result<ShutdownReport, HarnessError> {
let _ = self.shutdown_tx.send(true);
let join = self.join.take().expect("join handle");
tokio::task::spawn_blocking(move || join.join())
.await
.map_err(|_| HarnessError("spawn_blocking join failed".to_string()))?
.map_err(|_| HarnessError("harness background thread panicked".to_string()))?;
Ok(ShutdownReport { timed_out: false })
}
}
impl Drop for OrchestratorHarness {
fn drop(&mut self) {
let _ = self.shutdown_tx.send(true);
if let Some(join) = self.join.take() {
let _ = join.join();
}
}
}
#[cfg(test)]
#[path = "../harness_tests.rs"]
mod harness_tests;