use crate::budget::{BudgetError, BudgetLimits, CoordinationBudget};
use crate::concurrency::ConcurrencyControl;
use crate::types::AgentOutput;
use car_engine::Runtime;
use car_eventlog::EventLog;
use car_policy::PolicyEngine;
use car_state::StateStore;
use std::sync::Arc;
use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
pub struct SharedInfra {
pub state: Arc<StateStore>,
pub log: Arc<TokioMutex<EventLog>>,
pub policies: Arc<TokioRwLock<PolicyEngine>>,
pub budget: Arc<CoordinationBudget>,
pub concurrency: Option<ConcurrencyControl>,
}
impl SharedInfra {
pub fn new() -> Self {
Self {
state: Arc::new(StateStore::new()),
log: Arc::new(TokioMutex::new(EventLog::new())),
policies: Arc::new(TokioRwLock::new(PolicyEngine::new())),
budget: Arc::new(CoordinationBudget::unbounded()),
concurrency: None,
}
}
pub fn with_shared(
state: Arc<StateStore>,
log: Arc<TokioMutex<EventLog>>,
policies: Arc<TokioRwLock<PolicyEngine>>,
) -> Self {
Self {
state,
log,
policies,
budget: Arc::new(CoordinationBudget::unbounded()),
concurrency: None,
}
}
pub fn with_budget(mut self, limits: BudgetLimits) -> Self {
self.budget = Arc::new(CoordinationBudget::new(limits));
self
}
pub fn with_shared_budget(mut self, budget: Arc<CoordinationBudget>) -> Self {
self.budget = budget;
self
}
pub fn with_concurrency_control(mut self, control: ConcurrencyControl) -> Self {
self.concurrency = Some(control);
self
}
pub fn with_concurrency_gating(self) -> Self {
self.with_concurrency_control(ConcurrencyControl::with_default_policy())
}
pub fn begin_agent(&self) -> Result<(), BudgetError> {
self.budget.try_begin_agent()
}
pub fn record_output(&self, out: &AgentOutput) {
self.budget.record_output(out);
}
pub async fn record_output_metered(&self, out: &AgentOutput) {
self.record_output(out);
let Some(tokens) = &out.tokens else {
return;
};
let mut data: std::collections::HashMap<String, serde_json::Value> =
std::collections::HashMap::new();
data.insert(
"agent".to_string(),
serde_json::Value::from(out.name.clone()),
);
if !out.tools_used.is_empty() {
data.insert(
"tools".to_string(),
serde_json::Value::from(out.tools_used.join(",")),
);
}
let metrics = car_eventlog::Metrics {
duration_ms: Some(out.duration_ms),
tokens_in: Some(tokens.input_tokens),
tokens_out: Some(tokens.output_tokens),
cost_usd: Some(tokens.cost_usd),
};
let mut log = self.log.lock().await;
log.append_metered(
car_eventlog::EventKind::InferenceMetered,
None,
None,
data,
metrics,
);
}
pub fn make_runtime(&self) -> Runtime {
Runtime::with_shared(
Arc::clone(&self.state),
Arc::clone(&self.log),
Arc::clone(&self.policies),
)
}
pub fn make_isolated_runtime(
&self,
agent_name: &str,
) -> (Runtime, crate::task_context::AgentContext) {
let ctx = crate::task_context::AgentContext::new(agent_name, Arc::clone(&self.state));
let rt = Runtime::with_shared(
Arc::clone(&ctx.local_state),
Arc::clone(&ctx.local_log),
Arc::clone(&self.policies),
);
(rt, ctx)
}
}
impl Default for SharedInfra {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn with_shared_reuses_the_exact_provided_arcs() {
let state = Arc::new(StateStore::new());
let log = Arc::new(TokioMutex::new(EventLog::new()));
let policies = Arc::new(TokioRwLock::new(PolicyEngine::new()));
let infra =
SharedInfra::with_shared(Arc::clone(&state), Arc::clone(&log), Arc::clone(&policies));
assert!(Arc::ptr_eq(&infra.state, &state));
assert!(Arc::ptr_eq(&infra.log, &log));
assert!(Arc::ptr_eq(&infra.policies, &policies));
}
}