use std::sync::atomic::{AtomicU32, AtomicU8, Ordering};
use std::sync::Arc;
use once_cell::sync::OnceCell;
use scc::HashMap as SccHashMap;
use serde::Serialize;
use uuid::Uuid;
use agent_os_sidecar::protocol::{
AuthenticateRequest, OwnershipScope, RequestPayload, ResponsePayload,
};
use crate::agent_os::AgentOs;
use crate::error::ClientError;
use crate::transport::SidecarTransport;
pub(crate) struct SharedConnection {
pub(crate) transport: Arc<SidecarTransport>,
pub(crate) connection_id: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum SidecarState {
Ready,
Disposing,
Disposed,
}
impl SidecarState {
pub const fn as_str(self) -> &'static str {
match self {
SidecarState::Ready => "ready",
SidecarState::Disposing => "disposing",
SidecarState::Disposed => "disposed",
}
}
pub(crate) const fn as_u8(self) -> u8 {
match self {
SidecarState::Ready => 0,
SidecarState::Disposing => 1,
SidecarState::Disposed => 2,
}
}
pub(crate) const fn from_u8(value: u8) -> Self {
match value {
0 => SidecarState::Ready,
1 => SidecarState::Disposing,
2 => SidecarState::Disposed,
_ => SidecarState::Disposed,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(tag = "kind", rename_all = "lowercase")]
pub enum AgentOsSidecarPlacement {
Shared {
#[serde(skip_serializing_if = "Option::is_none")]
pool: Option<String>,
},
Explicit {
#[serde(rename = "sidecarId")]
sidecar_id: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AgentOsSidecarDescription {
pub sidecar_id: String,
pub placement: AgentOsSidecarPlacement,
pub state: SidecarState,
pub active_vm_count: u32,
}
pub struct AgentOsSidecar {
pub(crate) sidecar_id: String,
pub(crate) placement: AgentOsSidecarPlacement,
pub(crate) shared_pool: Option<String>,
pub(crate) state: AtomicU8,
pub(crate) active_vm_count: AtomicU32,
pub(crate) connection: tokio::sync::Mutex<Option<SharedConnection>>,
}
impl AgentOsSidecar {
pub(crate) fn new(
sidecar_id: impl Into<String>,
placement: AgentOsSidecarPlacement,
shared_pool: Option<String>,
) -> Self {
Self {
sidecar_id: sidecar_id.into(),
placement,
shared_pool,
state: AtomicU8::new(SidecarState::Ready.as_u8()),
active_vm_count: AtomicU32::new(0),
connection: tokio::sync::Mutex::new(None),
}
}
pub(crate) async fn ensure_connection(
&self,
) -> Result<(Arc<SidecarTransport>, String, usize), ClientError> {
let mut guard = self.connection.lock().await;
if let Some(existing) = guard.as_ref() {
let max_frame = existing.transport.max_frame_bytes.load(Ordering::SeqCst);
return Ok((
existing.transport.clone(),
existing.connection_id.clone(),
max_frame,
));
}
let transport = SidecarTransport::spawn().await?;
let authed = match transport
.request(
OwnershipScope::connection("client-hint"),
RequestPayload::Authenticate(AuthenticateRequest {
client_name: "agent-os-client".to_string(),
auth_token: "agent-os-client".to_string(),
bridge_version: agent_os_bridge::bridge_contract().version,
}),
)
.await?
{
ResponsePayload::Authenticated(authed) => authed,
ResponsePayload::Rejected(rejected) => {
return Err(ClientError::Kernel {
code: rejected.code,
message: rejected.message,
});
}
_ => return Err(ClientError::Sidecar("unexpected authenticate response".to_string())),
};
let max_frame = authed.max_frame_bytes as usize;
transport.max_frame_bytes.store(max_frame, Ordering::SeqCst);
*guard = Some(SharedConnection {
transport: transport.clone(),
connection_id: authed.connection_id.clone(),
});
Ok((transport, authed.connection_id, max_frame))
}
pub(crate) async fn kill_connection(&self) {
if let Some(connection) = self.connection.lock().await.take() {
if let Some(mut child) = connection.transport.child.lock().take() {
let _ = child.start_kill();
}
}
}
pub fn describe(&self) -> AgentOsSidecarDescription {
AgentOsSidecarDescription {
sidecar_id: self.sidecar_id.clone(),
placement: self.placement.clone(),
state: SidecarState::from_u8(self.state.load(Ordering::SeqCst)),
active_vm_count: self.active_vm_count.load(Ordering::SeqCst),
}
}
pub async fn dispose(&self) -> Result<(), ClientError> {
if SidecarState::from_u8(self.state.load(Ordering::SeqCst)) == SidecarState::Disposed {
return Ok(());
}
self.state
.store(SidecarState::Disposing.as_u8(), Ordering::SeqCst);
let errors: Vec<String> = Vec::new();
self.active_vm_count.store(0, Ordering::SeqCst);
self.state
.store(SidecarState::Disposed.as_u8(), Ordering::SeqCst);
if let Some(pool) = self.shared_pool.as_deref() {
let self_id = self.sidecar_id.as_str();
let _ = shared_sidecars().remove_if(pool, |cached| cached.sidecar_id == self_id);
}
if errors.is_empty() {
Ok(())
} else {
let aggregated = errors.join("; ");
Err(ClientError::Sidecar(aggregated))
}
}
}
pub(crate) trait SidecarVmAdmin: Send + Sync {
}
pub(crate) struct AgentOsSidecarVmLease {
pub(crate) vm_id: String,
pub(crate) sidecar: Arc<AgentOsSidecar>,
}
impl AgentOsSidecarVmLease {
pub(crate) async fn dispose(self) -> Result<(), ClientError> {
let sidecar = self.sidecar;
let mut current = sidecar.active_vm_count.load(Ordering::SeqCst);
loop {
let next = current.saturating_sub(1);
match sidecar.active_vm_count.compare_exchange_weak(
current,
next,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break,
Err(observed) => current = observed,
}
}
Ok(())
}
}
static SHARED_SIDECARS: OnceCell<SccHashMap<String, Arc<AgentOsSidecar>>> = OnceCell::new();
pub(crate) fn shared_sidecars() -> &'static SccHashMap<String, Arc<AgentOsSidecar>> {
SHARED_SIDECARS.get_or_init(SccHashMap::new)
}
impl AgentOs {
pub async fn create_sidecar(
sidecar_id: Option<String>,
) -> Result<Arc<AgentOsSidecar>, ClientError> {
let sidecar_id = sidecar_id.unwrap_or_else(|| format!("agent-os-sidecar-{}", Uuid::new_v4()));
let placement = AgentOsSidecarPlacement::Explicit {
sidecar_id: sidecar_id.clone(),
};
Ok(Arc::new(AgentOsSidecar::new(sidecar_id, placement, None)))
}
pub async fn get_shared_sidecar(
pool: Option<String>,
) -> Result<Arc<AgentOsSidecar>, ClientError> {
let pool = pool.unwrap_or_else(|| "default".to_string());
let cache = shared_sidecars();
if let Some(existing) = cache.read(&pool, |_, sidecar| sidecar.clone()) {
if existing.describe().state != SidecarState::Disposed {
return Ok(existing);
}
}
let placement_pool = if pool.is_empty() {
None
} else {
Some(pool.clone())
};
let sidecar = Arc::new(AgentOsSidecar::new(
format!("agent-os-shared-sidecar:{pool}"),
AgentOsSidecarPlacement::Shared {
pool: placement_pool,
},
Some(pool.clone()),
));
match cache.entry(pool) {
scc::hash_map::Entry::Occupied(mut occupied) => {
if occupied.get().describe().state == SidecarState::Disposed {
*occupied.get_mut() = sidecar.clone();
Ok(sidecar)
} else {
Ok(occupied.get().clone())
}
}
scc::hash_map::Entry::Vacant(vacant) => {
vacant.insert_entry(sidecar.clone());
Ok(sidecar)
}
}
}
}