use std::collections::HashSet;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use dashmap::DashMap;
use tokio::sync::{mpsc, Notify};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use super::artifact::ArtifactStore;
use super::context::ToolContext;
use super::credentials::{
revoke_all_for_job, CredentialJobContext, CredentialLedger, CredentialProvisioner,
};
use super::job::{JobEntry, JobRegistry};
use super::session::{HandshakePhase, SessionState};
use super::subscription::SubscriptionManager;
use super::tools::ToolRegistry;
use crate::store::eventlog::EventLog;
use arcp_core::auth::{AuthOutcome, AuthRegistry, Authenticator};
use arcp_core::envelope::Envelope;
use arcp_core::error::{ARCPError, ErrorCode};
use arcp_core::extensions::ExtensionRegistry;
use arcp_core::ids::IdempotencyKey;
use arcp_core::ids::SubscriptionId;
use arcp_core::ids::{JobId, MessageId, SessionId};
use arcp_core::messages::{
ArtifactFetchPayload, ArtifactPutPayload, ArtifactRefPayload, ArtifactReleasePayload,
CancelPayload, CancelTargetKind, Capabilities, JobAcceptedPayload, JobCancelledPayload,
JobCompletedPayload, JobFailedPayload, JobStartedPayload, JobState, JobSubscribePayload,
JobSubscribedPayload, JobUnsubscribePayload, LeaseRequest, MessageType, NackPayload,
RuntimeIdentity, SessionAcceptedPayload, SessionLease, SessionOpenPayload,
SessionRejectedPayload, SessionUnauthenticatedPayload, SubscribeAcceptedPayload,
SubscribeEventPayload, SubscribePayload, ToolInvokePayload, UnsubscribePayload,
};
use arcp_core::transport::Transport;
use arcp_core::{IMPL_KIND, IMPL_VERSION};
pub struct RuntimeBuilder {
auth: AuthRegistry,
tools: ToolRegistry,
advertised_capabilities: Capabilities,
runtime_identity: RuntimeIdentity,
session_lease_seconds: Option<u64>,
ack_window: Option<u64>,
credential_provisioner: Option<Arc<dyn CredentialProvisioner>>,
}
impl Default for RuntimeBuilder {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for RuntimeBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RuntimeBuilder")
.field("advertised_capabilities", &self.advertised_capabilities)
.field("runtime_identity", &self.runtime_identity)
.field("session_lease_seconds", &self.session_lease_seconds)
.finish_non_exhaustive()
}
}
impl RuntimeBuilder {
#[must_use]
pub fn new() -> Self {
Self {
auth: AuthRegistry::new(),
tools: ToolRegistry::new(),
advertised_capabilities: Capabilities::default(),
runtime_identity: RuntimeIdentity {
kind: IMPL_KIND.to_owned(),
version: IMPL_VERSION.to_owned(),
fingerprint: None,
trust_level: Some("trusted".into()),
},
session_lease_seconds: Some(3600),
ack_window: None,
credential_provisioner: None,
}
}
#[must_use]
pub fn with_authenticator(mut self, auth: Box<dyn Authenticator>) -> Self {
self.auth.register(auth);
self
}
#[must_use]
pub fn with_tools(mut self, tools: ToolRegistry) -> Self {
self.tools = tools;
self
}
#[must_use]
pub fn with_capabilities(mut self, caps: Capabilities) -> Self {
self.advertised_capabilities = caps;
self
}
#[must_use]
pub fn with_identity(mut self, ident: RuntimeIdentity) -> Self {
self.runtime_identity = ident;
self
}
#[must_use]
pub const fn with_session_lease_seconds(mut self, seconds: u64) -> Self {
self.session_lease_seconds = Some(seconds);
self
}
#[must_use]
pub const fn with_ack_window(mut self, window: u64) -> Self {
self.ack_window = if window == 0 { None } else { Some(window) };
self
}
#[must_use]
pub fn with_credential_provisioner(
mut self,
provisioner: Arc<dyn CredentialProvisioner>,
) -> Self {
self.credential_provisioner = Some(provisioner);
self.advertised_capabilities.model_use = Some(true);
self.advertised_capabilities.provisioned_credentials = Some(true);
self
}
pub async fn build(self) -> Result<ARCPRuntime, ARCPError> {
if self.advertised_capabilities.provisioned_credentials == Some(true)
&& self.credential_provisioner.is_none()
{
return Err(ARCPError::FailedPrecondition {
detail: "provisioned_credentials advertised without a CredentialProvisioner".into(),
});
}
let event_log = EventLog::in_memory().await?;
Ok(ARCPRuntime {
inner: Arc::new(RuntimeInner {
auth: self.auth,
tools: self.tools,
advertised_capabilities: self.advertised_capabilities,
runtime_identity: self.runtime_identity,
session_lease_seconds: self.session_lease_seconds,
ack_window: self.ack_window,
extension_registry: ExtensionRegistry::new(),
event_log,
artifacts: ArtifactStore::new(),
subscriptions: SubscriptionManager::new(),
jobs: JobRegistry::new(),
session_principals: DashMap::new(),
credential_provisioner: self.credential_provisioner,
credential_ledger: CredentialLedger::new(),
idempotency_index: DashMap::new(),
}),
})
}
}
struct RuntimeInner {
auth: AuthRegistry,
tools: ToolRegistry,
advertised_capabilities: Capabilities,
runtime_identity: RuntimeIdentity,
session_lease_seconds: Option<u64>,
ack_window: Option<u64>,
#[allow(dead_code)]
extension_registry: ExtensionRegistry,
event_log: EventLog,
artifacts: ArtifactStore,
subscriptions: SubscriptionManager,
jobs: JobRegistry,
session_principals: DashMap<SessionId, Option<String>>,
credential_provisioner: Option<Arc<dyn CredentialProvisioner>>,
credential_ledger: CredentialLedger,
idempotency_index: DashMap<IdempotencyScope, IdempotencyRecord>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct IdempotencyScope {
principal_or_session: String,
idempotency_key: IdempotencyKey,
}
#[derive(Debug, Clone)]
struct IdempotencyRecord {
accepted: JobAcceptedPayload,
tool: String,
arguments_canonical: String,
}
#[derive(Clone)]
pub struct ARCPRuntime {
inner: Arc<RuntimeInner>,
}
impl std::fmt::Debug for ARCPRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ARCPRuntime").finish_non_exhaustive()
}
}
impl ARCPRuntime {
#[must_use]
pub fn builder() -> RuntimeBuilder {
RuntimeBuilder::new()
}
#[must_use]
pub fn event_log(&self) -> &EventLog {
&self.inner.event_log
}
#[must_use]
pub fn artifacts(&self) -> &ArtifactStore {
&self.inner.artifacts
}
#[must_use]
pub fn subscriptions(&self) -> &SubscriptionManager {
&self.inner.subscriptions
}
#[must_use]
pub fn serve_connection<T: Transport + 'static>(&self, transport: T) -> JoinHandle<()> {
let runtime = self.clone();
tokio::spawn(async move {
if let Err(e) = runtime.run_connection(transport).await {
tracing::warn!(error = %e, "connection terminated with error");
}
})
}
#[allow(clippy::too_many_lines, clippy::cognitive_complexity)]
pub async fn run_connection<T: Transport + 'static>(
&self,
transport: T,
) -> Result<(), ARCPError> {
let transport = Arc::new(transport);
let (out_tx, mut out_rx) = mpsc::channel::<Envelope>(256);
let writer_transport = Arc::clone(&transport);
let event_log = self.inner.event_log.clone();
let writer_subs = self.inner.subscriptions.clone();
let ack_window = self.inner.ack_window;
let emitted = Arc::new(AtomicU64::new(0));
let last_ack = Arc::new(AtomicU64::new(0));
let ack_notify = Arc::new(Notify::new());
let writer_emitted = Arc::clone(&emitted);
let writer_last_ack = Arc::clone(&last_ack);
let writer_ack_notify = Arc::clone(&ack_notify);
let writer_jobs = self.inner.jobs.clone();
let writer = tokio::spawn(async move {
while let Some(mut env) = out_rx.recv().await {
let is_countable = env.payload.is_countable_event();
if is_countable {
if let Some(window) = ack_window {
loop {
let in_flight = writer_emitted
.load(Ordering::Acquire)
.saturating_sub(writer_last_ack.load(Ordering::Acquire));
if in_flight < window {
break;
}
writer_ack_notify.notified().await;
if out_rx.is_closed() {
return;
}
}
}
let seq = writer_emitted.fetch_add(1, Ordering::AcqRel) + 1;
env.event_seq = Some(seq);
if let Some(job_id) = env.job_id.as_ref() {
writer_jobs.record_event_seq(job_id, seq);
}
}
if let Err(e) = event_log.append(&env).await {
tracing::warn!(error = %e, "failed to persist outbound envelope");
}
if !matches!(env.payload, MessageType::SubscribeEvent(_)) {
let publish_env = redact_for_subscribers(&env);
let _ = writer_subs.publish(&publish_env);
}
if let Err(e) = writer_transport.send(env).await {
tracing::warn!(error = %e, "transport send failed; closing writer");
break;
}
}
});
let jobs = self.inner.jobs.clone();
let connection_subs: Arc<DashMap<SubscriptionId, JoinHandle<()>>> =
Arc::new(DashMap::new());
let connection_job_subs: Arc<DashMap<JobId, JoinHandle<()>>> = Arc::new(DashMap::new());
let mut state: Option<SessionState> = None;
let mut seen_ids: HashSet<MessageId> = HashSet::new();
let mut explicit_close = false;
let result = loop {
let Some(envelope) = transport.recv().await? else {
break Ok(());
};
if !seen_ids.insert(envelope.id.clone()) {
tracing::debug!(id = %envelope.id, "dropping replayed envelope");
continue;
}
self.inner.event_log.append(&envelope).await?;
let _ = self.inner.subscriptions.publish(&envelope);
let in_handshake = state.as_ref().is_none_or(|s| !s.is_accepted());
if in_handshake && !envelope.payload.is_handshake() {
tracing::warn!(
id = %envelope.id,
type_name = envelope.payload.type_name(),
"dropping non-handshake message before session.accepted",
);
continue;
}
match envelope.payload.clone() {
MessageType::SessionOpen(payload) => {
state = Some(
self.handle_session_open(&out_tx, envelope.id.clone(), payload)
.await?,
);
}
MessageType::SessionAuthenticate(payload) => {
if let Some(s) = state.as_mut() {
self.handle_session_authenticate(
&out_tx,
envelope.id.clone(),
s,
&payload.response,
)
.await?;
} else {
tracing::warn!("session.authenticate before session.open; dropping");
}
}
MessageType::SessionClose(_) => {
tracing::info!("session.close received");
explicit_close = true;
break Ok(());
}
MessageType::ToolInvoke(payload) => {
if let Some(s) = state.as_ref() {
self.spawn_tool_invoke(
&out_tx,
&jobs,
envelope.id.clone(),
s.session_id.clone(),
s.principal.clone(),
envelope.idempotency_key.clone(),
payload,
)
.await;
}
}
MessageType::Cancel(payload) => {
if let Some(s) = state.as_ref() {
self.handle_cancel(&out_tx, &jobs, envelope.id.clone(), s, &payload)
.await;
}
}
MessageType::Ping(_) => {
let mut env = Envelope::new(MessageType::Pong(
arcp_core::messages::PongPayload::default(),
));
env.correlation_id = Some(envelope.id.clone());
if let Some(s) = state.as_ref() {
env.session_id = Some(s.session_id.clone());
}
let _ = out_tx.send(env).await;
}
MessageType::SessionPing(payload) => {
let mut env = Envelope::new(MessageType::SessionPong(
arcp_core::messages::SessionPongPayload {
ping_nonce: payload.nonce,
received_at: chrono::Utc::now(),
},
));
env.correlation_id = Some(envelope.id.clone());
if let Some(s) = state.as_ref() {
env.session_id = Some(s.session_id.clone());
}
let _ = out_tx.send(env).await;
}
MessageType::SessionPong(_) => {
}
MessageType::SessionAck(payload) => {
let cur = last_ack.load(Ordering::Acquire);
if payload.last_processed_seq > cur {
last_ack.store(payload.last_processed_seq, Ordering::Release);
ack_notify.notify_waiters();
}
}
MessageType::SessionListJobs(payload) => {
if let Some(s) = state.as_ref() {
let jobs_list =
jobs.list_for_session(&s.session_id, payload.filter.as_ref());
let response =
MessageType::SessionJobs(arcp_core::messages::SessionJobsPayload {
request_id: envelope.id.to_string(),
jobs: jobs_list,
next_cursor: None,
});
let mut env = Envelope::new(response);
env.correlation_id = Some(envelope.id.clone());
env.session_id = Some(s.session_id.clone());
let _ = out_tx.send(env).await;
}
}
MessageType::Subscribe(payload) => {
if let Some(s) = state.as_ref() {
Self::handle_subscribe(
&out_tx,
&self.inner.subscriptions,
&connection_subs,
envelope.id.clone(),
s.session_id.clone(),
payload,
)
.await;
}
}
MessageType::Unsubscribe(UnsubscribePayload { subscription_id }) => {
if let Some((_, join)) = connection_subs.remove(&subscription_id) {
join.abort();
}
let _ = self.inner.subscriptions.unsubscribe(&subscription_id);
}
MessageType::JobSubscribe(payload) => {
if let Some(s) = state.as_ref() {
Self::handle_job_subscribe(
&out_tx,
&self.inner.subscriptions,
&self.inner.jobs,
&self.inner.session_principals,
&connection_job_subs,
envelope.id.clone(),
s.session_id.clone(),
s.principal.clone(),
payload,
)
.await;
}
}
MessageType::JobUnsubscribe(JobUnsubscribePayload { job_id }) => {
if let Some((_, join)) = connection_job_subs.remove(&job_id) {
join.abort();
}
}
MessageType::ArtifactPut(payload) => {
if let Some(s) = state.as_ref() {
Self::handle_artifact_put(
&out_tx,
&self.inner.artifacts,
envelope.id.clone(),
s.session_id.clone(),
payload,
)
.await;
}
}
MessageType::ArtifactFetch(payload) => {
if let Some(s) = state.as_ref() {
Self::handle_artifact_fetch(
&out_tx,
&self.inner.artifacts,
envelope.id.clone(),
s.session_id.clone(),
payload,
)
.await;
}
}
MessageType::ArtifactRelease(ArtifactReleasePayload { artifact_id }) => {
self.inner.artifacts.release(&artifact_id);
}
_ if in_handshake => {
tracing::warn!(
type_name = envelope.payload.type_name(),
"unexpected handshake message direction",
);
}
_ => {
tracing::debug!(
type_name = envelope.payload.type_name(),
"dispatch arm not yet implemented",
);
}
}
};
if explicit_close {
if let Some(s) = state.as_ref() {
for snap in jobs.list_for_session(&s.session_id, None) {
let _ = jobs.cancel(&snap.job_id);
}
}
}
if let Some(s) = state.as_ref() {
self.inner.session_principals.remove(&s.session_id);
}
for entry in connection_subs.iter() {
entry.value().abort();
}
connection_subs.clear();
for entry in connection_job_subs.iter() {
entry.value().abort();
}
connection_job_subs.clear();
drop(out_tx);
ack_notify.notify_waiters();
let _ = writer.await;
result
}
async fn handle_session_open(
&self,
out: &mpsc::Sender<Envelope>,
correlation_id: MessageId,
payload: SessionOpenPayload,
) -> Result<SessionState, ARCPError> {
let SessionOpenPayload {
auth,
client,
capabilities: client_caps,
} = payload;
let negotiated = self.negotiate_capabilities(&client_caps);
let session_id = SessionId::new();
let mut state = SessionState::new(session_id.clone(), negotiated.clone());
let Some(authenticator) = self.inner.auth.get(&auth.scheme) else {
self.send_rejected(
out,
correlation_id,
ErrorCode::Unauthenticated,
format!("auth scheme {:?} not configured", auth.scheme),
)
.await;
state.phase = HandshakePhase::Closed;
return Ok(state);
};
let outcome = authenticator
.authenticate(&auth, &client, &negotiated)
.await?;
match outcome {
AuthOutcome::Accept { principal } => {
self.inner
.session_principals
.insert(session_id.clone(), Some(principal.clone()));
state.principal = Some(principal);
state.phase = HandshakePhase::Accepted;
self.send_accepted(out, correlation_id, &session_id, &negotiated)
.await;
}
AuthOutcome::Challenge { challenge } => {
state.active_challenge = Some(challenge.clone());
state.phase = HandshakePhase::Challenged;
let mut env = Envelope::new(MessageType::SessionChallenge(
arcp_core::messages::SessionChallengePayload {
challenge: challenge.clone(),
},
));
env.correlation_id = Some(correlation_id);
env.session_id = Some(session_id);
let _ = out.send(env).await;
}
AuthOutcome::Reject { reason } => {
self.send_rejected(out, correlation_id, ErrorCode::Unauthenticated, reason)
.await;
state.phase = HandshakePhase::Closed;
}
}
Ok(state)
}
async fn handle_session_authenticate(
&self,
out: &mpsc::Sender<Envelope>,
correlation_id: MessageId,
state: &mut SessionState,
response: &str,
) -> Result<(), ARCPError> {
let Some(challenge) = state.active_challenge.clone() else {
tracing::warn!("session.authenticate without active challenge; dropping");
return Ok(());
};
for scheme in [
arcp_core::messages::AuthScheme::Bearer,
arcp_core::messages::AuthScheme::SignedJwt,
] {
let Some(authenticator) = self.inner.auth.get(&scheme) else {
continue;
};
let outcome = authenticator
.verify_challenge_response(&challenge, response)
.await?;
match outcome {
AuthOutcome::Accept { principal } => {
self.inner
.session_principals
.insert(state.session_id.clone(), Some(principal.clone()));
state.principal = Some(principal);
state.phase = HandshakePhase::Accepted;
state.active_challenge = None;
self.send_accepted(out, correlation_id, &state.session_id, &state.capabilities)
.await;
return Ok(());
}
AuthOutcome::Challenge { .. } | AuthOutcome::Reject { .. } => {}
}
}
let mut env = Envelope::new(MessageType::SessionUnauthenticated(
SessionUnauthenticatedPayload {
code: ErrorCode::Unauthenticated,
message: "challenge response did not validate".into(),
},
));
env.correlation_id = Some(correlation_id);
env.session_id = Some(state.session_id.clone());
let _ = out.send(env).await;
Ok(())
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
async fn spawn_tool_invoke(
&self,
out: &mpsc::Sender<Envelope>,
jobs: &JobRegistry,
correlation_id: MessageId,
session_id: SessionId,
principal: Option<String>,
idempotency_key: Option<IdempotencyKey>,
payload: ToolInvokePayload,
) {
let idempotency_scope = idempotency_key.as_ref().map(|key| IdempotencyScope {
principal_or_session: principal.clone().unwrap_or_else(|| session_id.to_string()),
idempotency_key: key.clone(),
});
let canonical_args = serde_json::to_string(&payload.arguments).unwrap_or_default();
if let Some(scope) = idempotency_scope.as_ref() {
if let Some(record) = self.inner.idempotency_index.get(scope) {
if record.tool == payload.tool && record.arguments_canonical == canonical_args {
let mut accepted =
Envelope::new(MessageType::JobAccepted(record.accepted.clone()));
accepted.correlation_id = Some(correlation_id);
accepted.session_id = Some(session_id);
accepted.job_id = Some(record.accepted.job_id.clone());
accepted.idempotency_key = idempotency_key;
let _ = out.send(accepted).await;
return;
}
let mut err = Envelope::new(MessageType::JobFailed(JobFailedPayload {
code: ErrorCode::FailedPrecondition,
retryable: Some(false),
message: format!(
"idempotency key {} already bound to a different command intent",
scope.idempotency_key
),
details: None,
}));
err.correlation_id = Some(correlation_id);
err.session_id = Some(session_id);
err.idempotency_key = idempotency_key;
let _ = out.send(err).await;
return;
}
}
let job_id = JobId::new();
let agent_ref = match arcp_core::messages::AgentRef::parse(&payload.tool) {
Ok(r) => r,
Err(e) => {
let mut err = Envelope::new(MessageType::JobFailed(JobFailedPayload {
code: ErrorCode::InvalidArgument,
retryable: Some(false),
message: format!("invalid agent reference {}: {e}", payload.tool),
details: None,
}));
err.correlation_id = Some(correlation_id);
err.session_id = Some(session_id);
err.job_id = Some(job_id);
let _ = out.send(err).await;
return;
}
};
let lease = effective_lease(&payload);
let defer_accepted = self.inner.credential_provisioner.is_some() && lease.is_some();
let accepted_sent = if defer_accepted {
false
} else {
let mut accepted = Envelope::new(MessageType::JobAccepted(JobAcceptedPayload {
job_id: job_id.clone(),
credentials: vec![],
lease: lease.clone(),
}));
accepted.correlation_id = Some(correlation_id.clone());
accepted.session_id = Some(session_id.clone());
accepted.job_id = Some(job_id.clone());
let _ = out.send(accepted).await;
true
};
if agent_ref.version.is_some() {
let advertised = &self.inner.advertised_capabilities.agents;
let satisfied = advertised
.as_ref()
.is_some_and(|inv| inv.satisfies(&agent_ref));
if !satisfied {
let mut err = Envelope::new(MessageType::JobFailed(JobFailedPayload {
code: ErrorCode::AgentVersionNotAvailable,
retryable: Some(false),
message: format!("agent version not available: {}", agent_ref.format()),
details: None,
}));
err.correlation_id = Some(correlation_id);
err.session_id = Some(session_id);
err.job_id = Some(job_id);
let _ = out.send(err).await;
return;
}
}
let Some(handler) = self.inner.tools.get(&agent_ref.name) else {
let mut err = Envelope::new(MessageType::JobFailed(JobFailedPayload {
code: ErrorCode::NotFound,
retryable: Some(false),
message: format!("tool not registered: {}", agent_ref.name),
details: None,
}));
err.correlation_id = Some(correlation_id);
err.session_id = Some(session_id);
err.job_id = Some(job_id);
let _ = out.send(err).await;
return;
};
let credentials = if let (Some(provisioner), Some(lease_ref)) =
(&self.inner.credential_provisioner, lease.as_ref())
{
let ctx = CredentialJobContext {
job_id: job_id.clone(),
session_id: session_id.clone(),
principal: principal.clone(),
parent_job_id: None,
};
match provisioner.issue(lease_ref, &ctx).await {
Ok(credentials) => {
self.inner
.credential_ledger
.record_issued(&job_id, &credentials);
credentials
}
Err(e) => {
let mut err = Envelope::new(MessageType::JobFailed(JobFailedPayload {
code: e.code(),
retryable: Some(e.retryable()),
message: e.to_string(),
details: None,
}));
err.correlation_id = Some(correlation_id);
err.session_id = Some(session_id);
err.job_id = Some(job_id);
let _ = out.send(err).await;
return;
}
}
} else {
Vec::new()
};
if !accepted_sent {
let mut accepted = Envelope::new(MessageType::JobAccepted(JobAcceptedPayload {
job_id: job_id.clone(),
credentials: credentials.clone(),
lease: lease.clone(),
}));
accepted.correlation_id = Some(correlation_id.clone());
accepted.session_id = Some(session_id.clone());
accepted.job_id = Some(job_id.clone());
let _ = out.send(accepted).await;
}
let cancel = CancellationToken::new();
let entry = JobEntry {
job_id: job_id.clone(),
session_id: session_id.clone(),
correlation_id: correlation_id.clone(),
cancel: cancel.clone(),
state: JobState::Accepted,
agent: agent_ref.format(),
created_at: chrono::Utc::now(),
last_event_seq: 0,
parent_job_id: None,
credential_ids: self.inner.credential_ledger.outstanding_for_job(&job_id),
lease: lease.clone(),
};
let out_clone = out.clone();
let jobs_clone = jobs.clone();
let provisioner_clone = self.inner.credential_provisioner.clone();
let credential_ledger_clone = self.inner.credential_ledger.clone();
let cancel_for_task = cancel;
let budget_tracker = lease
.as_ref()
.and_then(|lease| lease.cost_budget.as_ref())
.map_or_else(crate::runtime::context::BudgetTracker::new, |budget| {
crate::runtime::context::BudgetTracker::from_budget(budget)
});
if let Some(scope) = idempotency_scope {
self.inner.idempotency_index.insert(
scope,
IdempotencyRecord {
accepted: JobAcceptedPayload {
job_id: job_id.clone(),
credentials: credentials.clone(),
lease: lease.clone(),
},
tool: agent_ref.format(),
arguments_canonical: canonical_args,
},
);
}
let join = tokio::spawn(async move {
let mut started = Envelope::new(MessageType::JobStarted(JobStartedPayload {
description: Some(format!("invoking {}", payload.tool)),
}));
started.correlation_id = Some(correlation_id.clone());
started.session_id = Some(session_id.clone());
started.job_id = Some(job_id.clone());
let _ = out_clone.send(started).await;
jobs_clone.set_state(&job_id, JobState::Running);
let ctx = ToolContext {
cancel: cancel_for_task.clone(),
job_id: job_id.clone(),
session_id: session_id.clone(),
correlation_id: correlation_id.clone(),
out: out_clone.clone(),
budget: budget_tracker,
lease,
};
let outcome = tokio::select! {
() = cancel_for_task.cancelled() => Outcome::Cancelled("cancellation token fired".into()),
result = handler.invoke(payload.arguments, ctx) => match result {
Ok(value) => Outcome::Completed(value),
Err(ARCPError::Cancelled { reason }) => Outcome::Cancelled(reason),
Err(e) => Outcome::Failed(e),
},
};
let terminal = match outcome {
Outcome::Completed(value) => {
jobs_clone.set_state(&job_id, JobState::Completed);
let completed = streamed_result_from_value(value);
MessageType::JobCompleted(completed)
}
Outcome::Failed(e) => {
jobs_clone.set_state(&job_id, JobState::Failed);
MessageType::JobFailed(JobFailedPayload {
code: e.code(),
retryable: Some(e.retryable()),
message: e.to_string(),
details: None,
})
}
Outcome::Cancelled(reason) => {
jobs_clone.set_state(&job_id, JobState::Cancelled);
MessageType::JobCancelled(JobCancelledPayload {
reason: Some(reason),
})
}
};
let mut term = Envelope::new(terminal);
term.correlation_id = Some(correlation_id);
term.session_id = Some(session_id);
term.job_id = Some(job_id.clone());
let _ = out_clone.send(term).await;
if let Some(provisioner) = provisioner_clone.as_ref() {
if let Err(e) =
revoke_all_for_job(&credential_ledger_clone, provisioner, &job_id).await
{
tracing::warn!(error = %e, job_id = %job_id, "failed to revoke credentials");
}
}
});
jobs.insert(entry, join);
}
async fn handle_cancel(
&self,
out: &mpsc::Sender<Envelope>,
jobs: &JobRegistry,
correlation_id: MessageId,
requester: &SessionState,
payload: &CancelPayload,
) {
let CancelPayload {
target, target_id, ..
} = payload;
match target {
CancelTargetKind::Job => {
#[allow(clippy::option_if_let_else)] let response_payload = if let Ok(job_id) = target_id.parse::<JobId>() {
if let Some(snap) = jobs.snapshot(&job_id) {
let authorized = snap.session_id == requester.session_id
|| cancel_principal_matches(
&self.inner.session_principals,
&snap.session_id,
requester.principal.as_deref(),
);
if authorized {
if jobs.cancel(&job_id) {
MessageType::CancelAccepted(
arcp_core::messages::CancelAcceptedPayload {
target_id: Some(target_id.clone()),
},
)
} else {
MessageType::CancelRefused(
arcp_core::messages::CancelRefusedPayload {
target_id: target_id.clone(),
reason: "job is no longer in-flight".into(),
},
)
}
} else {
MessageType::CancelRefused(arcp_core::messages::CancelRefusedPayload {
target_id: target_id.clone(),
reason: "permission denied: not authorized to cancel this job"
.into(),
})
}
} else {
MessageType::CancelRefused(arcp_core::messages::CancelRefusedPayload {
target_id: target_id.clone(),
reason: "no such in-flight job".into(),
})
}
} else {
MessageType::CancelRefused(arcp_core::messages::CancelRefusedPayload {
target_id: target_id.clone(),
reason: "malformed job id".into(),
})
};
let mut env = Envelope::new(response_payload);
env.correlation_id = Some(correlation_id);
env.session_id = Some(requester.session_id.clone());
let _ = out.send(env).await;
}
CancelTargetKind::Stream | CancelTargetKind::Session => {
tracing::warn!(?target, "cancel target not yet implemented");
}
}
}
fn negotiate_capabilities(&self, client_caps: &Capabilities) -> Capabilities {
let runtime_caps = &self.inner.advertised_capabilities;
Capabilities {
streaming: intersect_bool(runtime_caps.streaming, client_caps.streaming),
durable_jobs: intersect_bool(runtime_caps.durable_jobs, client_caps.durable_jobs),
checkpoints: intersect_bool(runtime_caps.checkpoints, client_caps.checkpoints),
binary_streams: intersect_bool(runtime_caps.binary_streams, client_caps.binary_streams),
agent_handoff: intersect_bool(runtime_caps.agent_handoff, client_caps.agent_handoff),
model_use: intersect_bool(runtime_caps.model_use, client_caps.model_use),
provisioned_credentials: intersect_bool(
runtime_caps.provisioned_credentials,
client_caps.provisioned_credentials,
),
artifacts: intersect_bool(runtime_caps.artifacts, client_caps.artifacts),
subscriptions: intersect_bool(runtime_caps.subscriptions, client_caps.subscriptions),
scheduled_jobs: intersect_bool(runtime_caps.scheduled_jobs, client_caps.scheduled_jobs),
interrupt: intersect_bool(runtime_caps.interrupt, client_caps.interrupt),
anonymous: intersect_bool(runtime_caps.anonymous, client_caps.anonymous),
heartbeat_recovery: runtime_caps.heartbeat_recovery.clone(),
binary_encoding: runtime_caps.binary_encoding.clone(),
extensions: runtime_caps
.extensions
.iter()
.filter(|e| client_caps.extensions.contains(e))
.cloned()
.collect(),
artifact_retention: runtime_caps.artifact_retention.clone(),
agents: runtime_caps.agents.clone(),
extra: std::collections::BTreeMap::new(),
}
}
async fn send_accepted(
&self,
out: &mpsc::Sender<Envelope>,
correlation_id: MessageId,
session_id: &SessionId,
capabilities: &Capabilities,
) {
let lease = self.inner.session_lease_seconds.map(|s| SessionLease {
expires_at: chrono::Utc::now()
+ chrono::Duration::seconds(i64::try_from(s).unwrap_or(i64::MAX)),
});
let mut env = Envelope::new(MessageType::SessionAccepted(SessionAcceptedPayload {
session_id: session_id.clone(),
runtime: self.inner.runtime_identity.clone(),
capabilities: capabilities.clone(),
lease,
}));
env.correlation_id = Some(correlation_id);
env.session_id = Some(session_id.clone());
let _ = out.send(env).await;
}
async fn send_rejected(
&self,
out: &mpsc::Sender<Envelope>,
correlation_id: MessageId,
code: ErrorCode,
message: String,
) {
let mut env = Envelope::new(MessageType::SessionRejected(SessionRejectedPayload {
code,
message,
}));
env.correlation_id = Some(correlation_id);
let _ = out.send(env).await;
}
async fn handle_subscribe(
out: &mpsc::Sender<Envelope>,
manager: &SubscriptionManager,
connection_subs: &Arc<DashMap<SubscriptionId, JoinHandle<()>>>,
correlation_id: MessageId,
session_id: SessionId,
payload: SubscribePayload,
) {
let SubscribePayload { filter, since: _ } = payload;
let (subscription_id, mut rx) = manager.register(filter, session_id.clone());
let mut accepted =
Envelope::new(MessageType::SubscribeAccepted(SubscribeAcceptedPayload {
subscription_id: subscription_id.clone(),
}));
accepted.correlation_id = Some(correlation_id);
accepted.session_id = Some(session_id);
accepted.subscription_id = Some(subscription_id.clone());
let _ = out.send(accepted).await;
let out_clone = out.clone();
let sub_id = subscription_id.clone();
let join = tokio::spawn(async move {
while let Some(event) = rx.next().await {
let value = match serde_json::to_value(&event) {
Ok(v) => v,
Err(e) => {
tracing::warn!(error = %e, "subscribe.event serialise failed");
continue;
}
};
let mut wrapper =
Envelope::new(MessageType::SubscribeEvent(SubscribeEventPayload {
event: value,
}));
wrapper.subscription_id = Some(sub_id.clone());
if out_clone.send(wrapper).await.is_err() {
break;
}
}
});
connection_subs.insert(subscription_id, join);
}
#[allow(clippy::too_many_arguments)]
async fn handle_job_subscribe(
out: &mpsc::Sender<Envelope>,
manager: &SubscriptionManager,
jobs: &JobRegistry,
session_principals: &DashMap<SessionId, Option<String>>,
connection_job_subs: &Arc<DashMap<JobId, JoinHandle<()>>>,
correlation_id: MessageId,
subscriber_session: SessionId,
subscriber_principal: Option<String>,
payload: JobSubscribePayload,
) {
let JobSubscribePayload {
job_id,
from_event_seq: _,
history: _,
} = payload;
let Some(snap) = jobs.snapshot(&job_id) else {
let mut err = Envelope::new(MessageType::Nack(NackPayload {
code: ErrorCode::NotFound,
message: format!("no such job: {job_id}"),
details: None,
}));
err.correlation_id = Some(correlation_id);
err.session_id = Some(subscriber_session);
let _ = out.send(err).await;
return;
};
if snap.session_id != subscriber_session {
let submitter_principal = session_principals
.get(&snap.session_id)
.and_then(|p| p.value().clone());
let permitted = match (&submitter_principal, &subscriber_principal) {
(Some(a), Some(b)) => a == b,
_ => false,
};
if !permitted {
let mut err = Envelope::new(MessageType::Nack(NackPayload {
code: ErrorCode::PermissionDenied,
message: "principal not authorized to subscribe to this job".into(),
details: None,
}));
err.correlation_id = Some(correlation_id);
err.session_id = Some(subscriber_session);
err.job_id = Some(job_id);
let _ = out.send(err).await;
return;
}
}
let filter = arcp_core::messages::SubscriptionFilter {
job_id: vec![job_id.clone()],
..arcp_core::messages::SubscriptionFilter::default()
};
let (_internal_id, mut rx) = manager.register(filter, subscriber_session.clone());
let ack = JobSubscribedPayload {
job_id: job_id.clone(),
current_status: snap.state.wire_str().to_owned(),
agent: snap.agent.clone(),
parent_job_id: snap.parent_job_id.clone(),
trace_id: None,
subscribed_from: snap.last_event_seq,
replayed: false,
};
let mut ack_env = Envelope::new(MessageType::JobSubscribed(ack));
ack_env.correlation_id = Some(correlation_id);
ack_env.session_id = Some(subscriber_session.clone());
ack_env.job_id = Some(job_id.clone());
let _ = out.send(ack_env).await;
let out_clone = out.clone();
let subscriber_session_clone = subscriber_session;
let job_id_clone = job_id.clone();
let connection_job_subs_clone = Arc::clone(connection_job_subs);
let join = tokio::spawn(async move {
while let Some(mut env) = rx.next().await {
if !is_forwardable_job_event(&env.payload) {
continue;
}
env.session_id = Some(subscriber_session_clone.clone());
if out_clone.send(env).await.is_err() {
break;
}
}
connection_job_subs_clone.remove(&job_id_clone);
});
connection_job_subs.insert(job_id, join);
}
async fn handle_artifact_put(
out: &mpsc::Sender<Envelope>,
store: &ArtifactStore,
correlation_id: MessageId,
session_id: SessionId,
payload: ArtifactPutPayload,
) {
let ArtifactPutPayload {
media_type,
data,
sha256,
retain_seconds,
} = payload;
let mut env = match store.put(media_type, &data, retain_seconds, sha256) {
Ok(reference) => Envelope::new(MessageType::ArtifactRef(ArtifactRefPayload {
artifact: reference,
})),
Err(e) => Envelope::new(MessageType::Nack(NackPayload {
code: e.code(),
message: e.to_string(),
details: None,
})),
};
env.correlation_id = Some(correlation_id);
env.session_id = Some(session_id);
let _ = out.send(env).await;
}
async fn handle_artifact_fetch(
out: &mpsc::Sender<Envelope>,
store: &ArtifactStore,
correlation_id: MessageId,
session_id: SessionId,
payload: ArtifactFetchPayload,
) {
let ArtifactFetchPayload { artifact_id } = payload;
let mut env = match store.fetch(&artifact_id) {
Ok((data, media_type)) => Envelope::new(MessageType::ArtifactPut(ArtifactPutPayload {
media_type,
data,
sha256: None,
retain_seconds: None,
})),
Err(e) => Envelope::new(MessageType::Nack(NackPayload {
code: e.code(),
message: e.to_string(),
details: None,
})),
};
env.correlation_id = Some(correlation_id);
env.session_id = Some(session_id);
let _ = out.send(env).await;
}
}
enum Outcome {
Completed(serde_json::Value),
Failed(ARCPError),
Cancelled(String),
}
fn effective_lease(payload: &ToolInvokePayload) -> Option<LeaseRequest> {
if let Some(lease) = payload.lease_request.clone() {
return Some(lease);
}
payload.cost_budget.clone().map(|cost_budget| LeaseRequest {
cost_budget: Some(cost_budget),
model_use: None,
expires_at: None,
extra: std::collections::BTreeMap::new(),
})
}
fn redact_for_subscribers(env: &Envelope) -> Envelope {
let mut out = env.clone();
if let MessageType::JobAccepted(payload) = &mut out.payload {
payload.credentials.clear();
}
out
}
pub const STREAMED_RESULT_SENTINEL: &str = "$arcp_streamed_result";
fn streamed_result_from_value(value: serde_json::Value) -> JobCompletedPayload {
if let Some(obj) = value.as_object() {
if obj.len() == 1 {
if let Some(inner) = obj.get(STREAMED_RESULT_SENTINEL) {
let result_id = inner
.get("result_id")
.and_then(|v| v.as_str())
.map(String::from);
let result_size = inner.get("result_size").and_then(serde_json::Value::as_u64);
let summary = inner
.get("summary")
.and_then(|v| v.as_str())
.map(String::from);
if result_id.is_some() {
return JobCompletedPayload {
value: None,
result_ref: None,
result_id,
result_size,
summary,
};
}
}
}
}
JobCompletedPayload {
value: Some(value),
result_ref: None,
result_id: None,
result_size: None,
summary: None,
}
}
const fn is_forwardable_job_event(payload: &MessageType) -> bool {
matches!(
payload,
MessageType::JobAccepted(_)
| MessageType::JobStarted(_)
| MessageType::JobProgress(_)
| MessageType::JobHeartbeat(_)
| MessageType::JobCompleted(_)
| MessageType::JobFailed(_)
| MessageType::JobCancelled(_)
| MessageType::JobResultChunk(_)
| MessageType::ToolResult(_)
| MessageType::ToolError(_)
| MessageType::Log(_)
| MessageType::Metric(_)
| MessageType::StreamOpen(_)
| MessageType::StreamChunk(_)
| MessageType::StreamClose(_)
| MessageType::StreamError(_)
| MessageType::ArtifactRef(_)
)
}
fn cancel_principal_matches(
session_principals: &DashMap<SessionId, Option<String>>,
owning_session: &SessionId,
requester_principal: Option<&str>,
) -> bool {
let Some(requester_principal) = requester_principal else {
return false;
};
session_principals
.get(owning_session)
.and_then(|p| p.value().clone())
.is_some_and(|owner| owner == requester_principal)
}
const fn intersect_bool(a: Option<bool>, b: Option<bool>) -> Option<bool> {
match (a, b) {
(Some(true), Some(true)) => Some(true),
(Some(_), _) | (_, Some(_)) => Some(false),
(None, None) => None,
}
}