use std::sync::Arc;
use axum::Router;
use axum::routing::get;
use meerkat_mob::ids::MeerkatId;
use crate::console_aggregator::{
ConsoleVisibilityPolicy, HideImplicitDelegateMembersConsoleVisibilityPolicy,
};
use crate::http_console::{
console_frontend_router, console_json_router_with_runtime_events_and_policy,
};
use crate::http_sse::{
agent_events_sse_router, mob_events_sse_router, mob_structural_events_sse_router,
};
use crate::runtime::RuntimeDecisionState;
use tower::limit::ConcurrencyLimitLayer;
use super::UnifiedRuntime;
pub const DEFAULT_REFERENCE_APP_MAX_CONCURRENT_REQUESTS: usize = 1024;
impl UnifiedRuntime {
pub fn build_console_json_router(&self, decisions: RuntimeDecisionState) -> Router {
self.build_console_json_router_with_policy(
decisions,
Arc::new(HideImplicitDelegateMembersConsoleVisibilityPolicy),
)
}
pub fn build_console_json_router_with_policy(
&self,
decisions: RuntimeDecisionState,
visibility_policy: Arc<dyn ConsoleVisibilityPolicy>,
) -> Router {
console_json_router_with_runtime_events_and_policy(
decisions,
self.mob_runtime.clone(),
Some(self.module_runtime_handle()),
self.contact_directory.clone(),
self.event_log_store(),
self.gateway_peer_keys().cloned(),
Some(self.console_events()),
Some(self.console_log_store()),
Some(self.mob_events_store()),
Some(Arc::clone(self.metadata_table())),
self.identity_runtime().cloned(),
visibility_policy,
)
}
pub fn build_console_frontend_router(&self) -> Router {
console_frontend_router()
}
pub fn build_reference_app_router(&self, decisions: RuntimeDecisionState) -> Router {
self.build_reference_app_router_with_console_visibility_policy(
decisions,
Arc::new(HideImplicitDelegateMembersConsoleVisibilityPolicy),
)
}
pub fn build_reference_app_router_with_console_visibility_policy(
&self,
decisions: RuntimeDecisionState,
visibility_policy: Arc<dyn ConsoleVisibilityPolicy>,
) -> Router {
let agent_runtime = self.mob_runtime.clone();
let mob_runtime = self.mob_runtime.clone();
let sse_decisions_a = decisions.clone();
let sse_decisions_b = decisions.clone();
let sse_decisions_c = decisions.clone();
Router::new()
.route("/healthz", get(|| async { "ok" }))
.merge(self.build_console_frontend_router())
.merge(self.build_console_json_router_with_policy(decisions, visibility_policy))
.merge(agent_events_sse_router(
Arc::new(move |agent_id| {
let runtime = agent_runtime.clone();
Box::pin(async move {
runtime
.handle()
.subscribe_agent_events(&MeerkatId::from(agent_id))
.await
.map_err(Into::into)
})
}),
Some(sse_decisions_a),
))
.merge(mob_events_sse_router(
Arc::new(move || {
let mob_runtime = mob_runtime.clone();
Box::pin(async move { mob_runtime.handle().subscribe_mob_events().await })
}),
Some(sse_decisions_b),
))
.merge(mob_structural_events_sse_router(
self.mob_runtime.handle(),
self.mob_events_store(),
Some(sse_decisions_c),
))
.layer(ConcurrencyLimitLayer::new(
DEFAULT_REFERENCE_APP_MAX_CONCURRENT_REQUESTS,
))
}
}
#[cfg(test)]
mod tests {
use super::DEFAULT_REFERENCE_APP_MAX_CONCURRENT_REQUESTS;
#[test]
fn reference_router_default_concurrency_allows_sse_fanout() {
assert_eq!(DEFAULT_REFERENCE_APP_MAX_CONCURRENT_REQUESTS, 1024);
}
}