use std::future::Future;
use std::future::IntoFuture;
use std::sync::atomic::Ordering;
use std::time::Duration;
use meerkat_mob::SpawnMemberSpec;
use serde_json::json;
use tokio::runtime::RuntimeFlavor;
use tokio::sync::mpsc::error::TryRecvError;
use crate::mob_handle_runtime::{MobRuntimeError, send_message_on_mob};
use crate::runtime::{
MobkitRuntimeHandle, RuntimeDecisionState, ScheduleDefinition, ScheduleDispatchReport,
ScheduleValidationError,
};
use crate::types::{EventEnvelope, ModuleEvent, UnifiedEvent};
use super::types::{
RediscoverReport, ShutdownDrainReport, UnifiedRuntimeError, UnifiedRuntimeRunReport,
UnifiedRuntimeShutdownReport,
};
use super::{MobEventIngress, UnifiedRuntime, discovery_spec_to_spawn_spec};
impl UnifiedRuntime {
pub async fn rediscover(&self) -> Result<Option<RediscoverReport>, MobRuntimeError> {
match self.rediscover_inner().await {
Ok(report) => Ok(report),
Err(err) => {
self.fire_error(super::types::ErrorEvent::RediscoverFailure {
error: format!("{err}"),
});
Err(err)
}
}
}
async fn rediscover_inner(&self) -> Result<Option<RediscoverReport>, MobRuntimeError> {
let discovery = match &self.discovery {
Some(d) => d,
None => return Ok(None),
};
self.mob_runtime
.handle()
.reset()
.await
.map_err(MobRuntimeError::Mob)?;
let specs = discovery.discover(serde_json::Value::Null).await;
let spawn_specs: Vec<SpawnMemberSpec> =
specs.iter().map(discovery_spec_to_spawn_spec).collect();
let spawned: Vec<String> = spawn_specs.iter().map(|s| s.identity.to_string()).collect();
self.spawn_many(spawn_specs).await?;
self.managed_dynamic_edges.write().await.clear();
let edges = self.reconcile_edges().await;
Ok(Some(RediscoverReport { spawned, edges }))
}
pub async fn run<F>(
&self,
listener: tokio::net::TcpListener,
decisions: RuntimeDecisionState,
shutdown_signal: F,
) -> UnifiedRuntimeRunReport
where
F: Future<Output = ()> + Send + 'static,
{
let app = self.build_reference_app_router(decisions);
let serve = axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal)
.into_future();
tokio::pin!(serve);
let serve_result = loop {
tokio::select! {
result = &mut serve => break result,
() = tokio::time::sleep(Duration::from_millis(25)) => {
let _ = self.drain_mob_agent_events().await;
}
}
};
let shutdown = self.shutdown().await;
UnifiedRuntimeRunReport {
serve_result,
shutdown,
}
}
pub async fn serve(
&self,
listener: tokio::net::TcpListener,
decisions: RuntimeDecisionState,
) -> std::io::Result<()> {
let app = self.build_reference_app_router(decisions);
let serve = axum::serve(listener, app).into_future();
tokio::pin!(serve);
loop {
tokio::select! {
result = &mut serve => break result,
() = tokio::time::sleep(Duration::from_millis(25)) => {
let _ = self.drain_mob_agent_events().await;
}
}
}
}
pub fn spawn_event_drain_task(self: std::sync::Arc<Self>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(25)).await;
if self.shutting_down.load(Ordering::SeqCst) {
break;
}
if let Err(err) = self.drain_mob_agent_events().await {
if matches!(err, UnifiedRuntimeError::RuntimeShuttingDown) {
break;
}
tracing::warn!(error = %err, "mob agent event drain tick failed");
}
}
})
}
pub async fn shutdown(&self) -> UnifiedRuntimeShutdownReport {
self.shutting_down.store(true, Ordering::SeqCst);
if let Some(task) = self.implicit_delegate_retirement_task.lock().await.take() {
task.abort();
}
let drain_start = std::time::Instant::now();
let mut drained_count = 0_usize;
let drain_result = tokio::time::timeout(self.drain_timeout, async {
loop {
if self.drain_mob_agent_events().await.is_err() {
break;
}
let ingress = self.mob_event_ingress.lock().await;
if ingress.is_none() {
break;
}
drop(ingress);
drained_count += 1;
tokio::time::sleep(Duration::from_millis(50)).await;
if drained_count > 1 {
break;
}
}
})
.await;
let drain = ShutdownDrainReport {
drained_count,
timed_out: drain_result.is_err(),
drain_duration_ms: drain_start.elapsed().as_millis() as u64,
};
self.close_event_router().await;
let module_shutdown = self.module_runtime.lock().await.shutdown();
let mob_stop = self
.mob_handle()
.stop()
.await
.map_err(MobRuntimeError::from);
UnifiedRuntimeShutdownReport {
drain,
module_shutdown,
mob_stop,
}
}
pub async fn drain_mob_agent_events(&self) -> Result<(), UnifiedRuntimeError> {
let mut disconnected = false;
let mut ingress_guard = match self.mob_event_ingress.try_lock() {
Ok(guard) => guard,
Err(_) => {
return Ok(());
}
};
let ingress = match ingress_guard.as_mut() {
Some(i) => i,
None => return Ok(()),
};
loop {
match Self::try_recv_ingress_event(ingress) {
Some(Ok(unified_event)) => {
if let crate::types::UnifiedEvent::Agent {
ref agent_id,
ref event_type,
..
} = unified_event.event
&& event_type == "run_failed"
{
self.fire_error(super::types::ErrorEvent::HostLoopCrash {
member_id: agent_id.clone(),
error: format!(
"agent run failed (event_id: {})",
unified_event.event_id
),
});
}
self.ingest_event(&unified_event);
self.project_console_event_from_unified(&unified_event)
.await;
self.module_runtime
.lock()
.await
.append_normalized_event(unified_event)?;
}
Some(Err(TryRecvError::Empty)) => break,
Some(Err(TryRecvError::Disconnected)) => {
disconnected = true;
break;
}
None => break,
}
}
if disconnected {
*ingress_guard = None;
}
Ok(())
}
pub(super) async fn close_event_router(&self) {
let ingress = self.mob_event_ingress.lock().await.take();
match ingress {
Some(MobEventIngress::Forwarder(forwarder)) => {
let task = forwarder.task;
task.abort();
let _ = task.await;
}
None => {}
}
if let Some(task) = self.mob_events_subscriber_task.lock().await.take() {
task.abort();
let _ = task.await;
}
}
fn try_recv_ingress_event(
ingress: &mut MobEventIngress,
) -> Option<Result<EventEnvelope<UnifiedEvent>, TryRecvError>> {
Some(match ingress {
MobEventIngress::Forwarder(forwarder) => forwarder.event_rx.try_recv(),
})
}
pub async fn dispatch_schedule_tick(
&self,
schedules: &[ScheduleDefinition],
tick_ms: u64,
) -> Result<ScheduleDispatchReport, UnifiedRuntimeError> {
if self.shutting_down.load(Ordering::SeqCst) {
return Err(UnifiedRuntimeError::RuntimeShuttingDown);
}
let mut dispatch_report = self
.dispatch_schedule_tick_blocking(schedules, tick_ms)
.await?;
for dispatch in &mut dispatch_report.dispatched {
let Some(runtime_injection) = dispatch.runtime_injection.clone() else {
continue;
};
let injection_result = send_message_on_mob(
&self.mob_handle(),
&runtime_injection.member_id,
runtime_injection.message.clone(),
)
.await;
match injection_result {
Ok(session_id) => {
self.module_runtime
.lock()
.await
.append_normalized_event(EventEnvelope {
event_id: format!("{}-executed", runtime_injection.injection_event_id),
source: "module".to_string(),
timestamp_ms: dispatch.tick_ms,
event: UnifiedEvent::Module(ModuleEvent {
module: "runtime".to_string(),
event_type: "runtime.injection.executed".to_string(),
payload: json!({
"schedule_id": dispatch.schedule_id.clone(),
"claim_key": dispatch.claim_key.clone(),
"member_id": runtime_injection.member_id,
"message": runtime_injection.message,
"session_id": session_id,
}),
}),
})?;
}
Err(error) => {
dispatch.runtime_injection_error =
Some(format!("mob injection failed: {error}"));
self.module_runtime
.lock()
.await
.append_normalized_event(EventEnvelope {
event_id: format!("{}-failed", runtime_injection.injection_event_id),
source: "module".to_string(),
timestamp_ms: dispatch.tick_ms,
event: UnifiedEvent::Module(ModuleEvent {
module: "runtime".to_string(),
event_type: "runtime.injection.failed".to_string(),
payload: json!({
"schedule_id": dispatch.schedule_id.clone(),
"claim_key": dispatch.claim_key.clone(),
"member_id": runtime_injection.member_id,
"message": runtime_injection.message,
"error_kind": "mob_runtime",
"error": format!("mob injection failed: {error}"),
}),
}),
})?;
}
}
}
self.drain_mob_agent_events().await?;
Ok(dispatch_report)
}
async fn dispatch_schedule_tick_blocking(
&self,
schedules: &[ScheduleDefinition],
tick_ms: u64,
) -> Result<ScheduleDispatchReport, UnifiedRuntimeError> {
let mut rt = self.module_runtime.lock().await;
let dispatch_result = if tokio::runtime::Handle::try_current()
.is_ok_and(|handle| handle.runtime_flavor() == RuntimeFlavor::MultiThread)
{
tokio::task::block_in_place(|| {
Self::dispatch_schedule_tick_in_joined_thread(&mut rt, schedules, tick_ms)
})
} else {
Self::dispatch_schedule_tick_in_joined_thread(&mut rt, schedules, tick_ms)
};
dispatch_result
.map_err(|_| UnifiedRuntimeError::ScheduleDispatchThreadPanicked)?
.map_err(UnifiedRuntimeError::ScheduleValidation)
}
fn dispatch_schedule_tick_in_joined_thread(
module_runtime: &mut MobkitRuntimeHandle,
schedules: &[ScheduleDefinition],
tick_ms: u64,
) -> std::thread::Result<Result<ScheduleDispatchReport, ScheduleValidationError>> {
std::thread::scope(|scope| {
scope
.spawn(move || module_runtime.dispatch_schedule_tick(schedules, tick_ms))
.join()
})
}
}