1use std::collections::HashSet;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7
8use dashmap::DashMap;
9use tokio::sync::{mpsc, Notify};
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use super::artifact::ArtifactStore;
14use super::context::ToolContext;
15use super::credentials::{
16 revoke_all_for_job, CredentialJobContext, CredentialLedger, CredentialProvisioner,
17};
18use super::job::{JobEntry, JobRegistry};
19use super::session::{HandshakePhase, SessionState};
20use super::subscription::SubscriptionManager;
21use super::tools::ToolRegistry;
22use crate::store::eventlog::EventLog;
23use arcp_core::auth::{AuthOutcome, AuthRegistry, Authenticator};
24use arcp_core::envelope::Envelope;
25use arcp_core::error::{ARCPError, ErrorCode};
26use arcp_core::extensions::ExtensionRegistry;
27use arcp_core::ids::IdempotencyKey;
28use arcp_core::ids::SubscriptionId;
29use arcp_core::ids::{JobId, MessageId, SessionId};
30use arcp_core::messages::{
31 ArtifactFetchPayload, ArtifactPutPayload, ArtifactRefPayload, ArtifactReleasePayload,
32 CancelPayload, CancelTargetKind, Capabilities, JobAcceptedPayload, JobCancelledPayload,
33 JobCompletedPayload, JobFailedPayload, JobStartedPayload, JobState, JobSubscribePayload,
34 JobSubscribedPayload, JobUnsubscribePayload, LeaseRequest, MessageType, NackPayload,
35 RuntimeIdentity, SessionAcceptedPayload, SessionLease, SessionOpenPayload,
36 SessionRejectedPayload, SessionUnauthenticatedPayload, SubscribeAcceptedPayload,
37 SubscribeEventPayload, SubscribePayload, ToolInvokePayload, UnsubscribePayload,
38};
39use arcp_core::transport::Transport;
40use arcp_core::{IMPL_KIND, IMPL_VERSION};
41
42pub struct RuntimeBuilder {
44 auth: AuthRegistry,
45 tools: ToolRegistry,
46 advertised_capabilities: Capabilities,
47 runtime_identity: RuntimeIdentity,
48 session_lease_seconds: Option<u64>,
49 ack_window: Option<u64>,
50 credential_provisioner: Option<Arc<dyn CredentialProvisioner>>,
51}
52
53impl Default for RuntimeBuilder {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl std::fmt::Debug for RuntimeBuilder {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct("RuntimeBuilder")
62 .field("advertised_capabilities", &self.advertised_capabilities)
63 .field("runtime_identity", &self.runtime_identity)
64 .field("session_lease_seconds", &self.session_lease_seconds)
65 .finish_non_exhaustive()
66 }
67}
68
69impl RuntimeBuilder {
70 #[must_use]
73 pub fn new() -> Self {
74 Self {
75 auth: AuthRegistry::new(),
76 tools: ToolRegistry::new(),
77 advertised_capabilities: Capabilities::default(),
78 runtime_identity: RuntimeIdentity {
79 kind: IMPL_KIND.to_owned(),
80 version: IMPL_VERSION.to_owned(),
81 fingerprint: None,
82 trust_level: Some("trusted".into()),
83 },
84 session_lease_seconds: Some(3600),
85 ack_window: None,
86 credential_provisioner: None,
87 }
88 }
89
90 #[must_use]
92 pub fn with_authenticator(mut self, auth: Box<dyn Authenticator>) -> Self {
93 self.auth.register(auth);
94 self
95 }
96
97 #[must_use]
99 pub fn with_tools(mut self, tools: ToolRegistry) -> Self {
100 self.tools = tools;
101 self
102 }
103
104 #[must_use]
106 pub fn with_capabilities(mut self, caps: Capabilities) -> Self {
107 self.advertised_capabilities = caps;
108 self
109 }
110
111 #[must_use]
113 pub fn with_identity(mut self, ident: RuntimeIdentity) -> Self {
114 self.runtime_identity = ident;
115 self
116 }
117
118 #[must_use]
120 pub const fn with_session_lease_seconds(mut self, seconds: u64) -> Self {
121 self.session_lease_seconds = Some(seconds);
122 self
123 }
124
125 #[must_use]
136 pub const fn with_ack_window(mut self, window: u64) -> Self {
137 self.ack_window = if window == 0 { None } else { Some(window) };
138 self
139 }
140
141 #[must_use]
143 pub fn with_credential_provisioner(
144 mut self,
145 provisioner: Arc<dyn CredentialProvisioner>,
146 ) -> Self {
147 self.credential_provisioner = Some(provisioner);
148 self.advertised_capabilities.model_use = Some(true);
149 self.advertised_capabilities.provisioned_credentials = Some(true);
150 self
151 }
152
153 pub async fn build(self) -> Result<ARCPRuntime, ARCPError> {
161 if self.advertised_capabilities.provisioned_credentials == Some(true)
162 && self.credential_provisioner.is_none()
163 {
164 return Err(ARCPError::FailedPrecondition {
165 detail: "provisioned_credentials advertised without a CredentialProvisioner".into(),
166 });
167 }
168 let event_log = EventLog::in_memory().await?;
169 Ok(ARCPRuntime {
170 inner: Arc::new(RuntimeInner {
171 auth: self.auth,
172 tools: self.tools,
173 advertised_capabilities: self.advertised_capabilities,
174 runtime_identity: self.runtime_identity,
175 session_lease_seconds: self.session_lease_seconds,
176 ack_window: self.ack_window,
177 extension_registry: ExtensionRegistry::new(),
178 event_log,
179 artifacts: ArtifactStore::new(),
180 subscriptions: SubscriptionManager::new(),
181 jobs: JobRegistry::new(),
182 session_principals: DashMap::new(),
183 credential_provisioner: self.credential_provisioner,
184 credential_ledger: CredentialLedger::new(),
185 idempotency_index: DashMap::new(),
186 }),
187 })
188 }
189}
190
191struct RuntimeInner {
192 auth: AuthRegistry,
193 tools: ToolRegistry,
194 advertised_capabilities: Capabilities,
195 runtime_identity: RuntimeIdentity,
196 session_lease_seconds: Option<u64>,
197 ack_window: Option<u64>,
200 #[allow(dead_code)]
201 extension_registry: ExtensionRegistry,
202 event_log: EventLog,
203 artifacts: ArtifactStore,
204 subscriptions: SubscriptionManager,
205 jobs: JobRegistry,
209 session_principals: DashMap<SessionId, Option<String>>,
212 credential_provisioner: Option<Arc<dyn CredentialProvisioner>>,
214 credential_ledger: CredentialLedger,
216 idempotency_index: DashMap<IdempotencyScope, IdempotencyRecord>,
222}
223
224#[derive(Debug, Clone, PartialEq, Eq, Hash)]
228struct IdempotencyScope {
229 principal_or_session: String,
230 idempotency_key: IdempotencyKey,
231}
232
233#[derive(Debug, Clone)]
234struct IdempotencyRecord {
235 accepted: JobAcceptedPayload,
236 tool: String,
237 arguments_canonical: String,
238}
239
240#[derive(Clone)]
242pub struct ARCPRuntime {
243 inner: Arc<RuntimeInner>,
244}
245
246impl std::fmt::Debug for ARCPRuntime {
247 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248 f.debug_struct("ARCPRuntime").finish_non_exhaustive()
249 }
250}
251
252impl ARCPRuntime {
253 #[must_use]
255 pub fn builder() -> RuntimeBuilder {
256 RuntimeBuilder::new()
257 }
258
259 #[must_use]
261 pub fn event_log(&self) -> &EventLog {
262 &self.inner.event_log
263 }
264
265 #[must_use]
267 pub fn artifacts(&self) -> &ArtifactStore {
268 &self.inner.artifacts
269 }
270
271 #[must_use]
273 pub fn subscriptions(&self) -> &SubscriptionManager {
274 &self.inner.subscriptions
275 }
276
277 #[must_use]
282 pub fn serve_connection<T: Transport + 'static>(&self, transport: T) -> JoinHandle<()> {
283 let runtime = self.clone();
284 tokio::spawn(async move {
285 if let Err(e) = runtime.run_connection(transport).await {
286 tracing::warn!(error = %e, "connection terminated with error");
287 }
288 })
289 }
290
291 #[allow(clippy::too_many_lines, clippy::cognitive_complexity)]
298 pub async fn run_connection<T: Transport + 'static>(
299 &self,
300 transport: T,
301 ) -> Result<(), ARCPError> {
302 let transport = Arc::new(transport);
303 let (out_tx, mut out_rx) = mpsc::channel::<Envelope>(256);
307 let writer_transport = Arc::clone(&transport);
308 let event_log = self.inner.event_log.clone();
309 let writer_subs = self.inner.subscriptions.clone();
310 let ack_window = self.inner.ack_window;
315 let emitted = Arc::new(AtomicU64::new(0));
316 let last_ack = Arc::new(AtomicU64::new(0));
317 let ack_notify = Arc::new(Notify::new());
318 let writer_emitted = Arc::clone(&emitted);
319 let writer_last_ack = Arc::clone(&last_ack);
320 let writer_ack_notify = Arc::clone(&ack_notify);
321 let writer_jobs = self.inner.jobs.clone();
322 let writer = tokio::spawn(async move {
323 while let Some(mut env) = out_rx.recv().await {
324 let is_countable = env.payload.is_countable_event();
330 if is_countable {
331 if let Some(window) = ack_window {
332 loop {
333 let in_flight = writer_emitted
334 .load(Ordering::Acquire)
335 .saturating_sub(writer_last_ack.load(Ordering::Acquire));
336 if in_flight < window {
337 break;
338 }
339 writer_ack_notify.notified().await;
345 if out_rx.is_closed() {
346 return;
347 }
348 }
349 }
350 let seq = writer_emitted.fetch_add(1, Ordering::AcqRel) + 1;
356 env.event_seq = Some(seq);
357 if let Some(job_id) = env.job_id.as_ref() {
358 writer_jobs.record_event_seq(job_id, seq);
359 }
360 }
361 if let Err(e) = event_log.append(&env).await {
362 tracing::warn!(error = %e, "failed to persist outbound envelope");
363 }
364 if !matches!(env.payload, MessageType::SubscribeEvent(_)) {
370 let publish_env = redact_for_subscribers(&env);
371 let _ = writer_subs.publish(&publish_env);
372 }
373 if let Err(e) = writer_transport.send(env).await {
374 tracing::warn!(error = %e, "transport send failed; closing writer");
375 break;
376 }
377 }
378 });
379
380 let jobs = self.inner.jobs.clone();
381 let connection_subs: Arc<DashMap<SubscriptionId, JoinHandle<()>>> =
384 Arc::new(DashMap::new());
385 let connection_job_subs: Arc<DashMap<JobId, JoinHandle<()>>> = Arc::new(DashMap::new());
388 let mut state: Option<SessionState> = None;
389 let mut seen_ids: HashSet<MessageId> = HashSet::new();
390 let mut explicit_close = false;
394
395 let result = loop {
396 let Some(envelope) = transport.recv().await? else {
397 break Ok(());
398 };
399
400 if !seen_ids.insert(envelope.id.clone()) {
402 tracing::debug!(id = %envelope.id, "dropping replayed envelope");
403 continue;
404 }
405
406 self.inner.event_log.append(&envelope).await?;
408 let _ = self.inner.subscriptions.publish(&envelope);
410
411 let in_handshake = state.as_ref().is_none_or(|s| !s.is_accepted());
412 if in_handshake && !envelope.payload.is_handshake() {
413 tracing::warn!(
414 id = %envelope.id,
415 type_name = envelope.payload.type_name(),
416 "dropping non-handshake message before session.accepted",
417 );
418 continue;
419 }
420
421 match envelope.payload.clone() {
422 MessageType::SessionOpen(payload) => {
423 state = Some(
424 self.handle_session_open(&out_tx, envelope.id.clone(), payload)
425 .await?,
426 );
427 }
428 MessageType::SessionAuthenticate(payload) => {
429 if let Some(s) = state.as_mut() {
430 self.handle_session_authenticate(
431 &out_tx,
432 envelope.id.clone(),
433 s,
434 &payload.response,
435 )
436 .await?;
437 } else {
438 tracing::warn!("session.authenticate before session.open; dropping");
439 }
440 }
441 MessageType::SessionClose(_) => {
442 tracing::info!("session.close received");
443 explicit_close = true;
444 break Ok(());
445 }
446 MessageType::ToolInvoke(payload) => {
447 if let Some(s) = state.as_ref() {
448 self.spawn_tool_invoke(
449 &out_tx,
450 &jobs,
451 envelope.id.clone(),
452 s.session_id.clone(),
453 s.principal.clone(),
454 envelope.idempotency_key.clone(),
455 payload,
456 )
457 .await;
458 }
459 }
460 MessageType::Cancel(payload) => {
461 if let Some(s) = state.as_ref() {
462 self.handle_cancel(&out_tx, &jobs, envelope.id.clone(), s, &payload)
463 .await;
464 }
465 }
466 MessageType::Ping(_) => {
467 let mut env = Envelope::new(MessageType::Pong(
468 arcp_core::messages::PongPayload::default(),
469 ));
470 env.correlation_id = Some(envelope.id.clone());
471 if let Some(s) = state.as_ref() {
472 env.session_id = Some(s.session_id.clone());
473 }
474 let _ = out_tx.send(env).await;
475 }
476 MessageType::SessionPing(payload) => {
477 let mut env = Envelope::new(MessageType::SessionPong(
480 arcp_core::messages::SessionPongPayload {
481 ping_nonce: payload.nonce,
482 received_at: chrono::Utc::now(),
483 },
484 ));
485 env.correlation_id = Some(envelope.id.clone());
486 if let Some(s) = state.as_ref() {
487 env.session_id = Some(s.session_id.clone());
488 }
489 let _ = out_tx.send(env).await;
490 }
491 MessageType::SessionPong(_) => {
492 }
496 MessageType::SessionAck(payload) => {
497 let cur = last_ack.load(Ordering::Acquire);
500 if payload.last_processed_seq > cur {
501 last_ack.store(payload.last_processed_seq, Ordering::Release);
502 ack_notify.notify_waiters();
503 }
504 }
505 MessageType::SessionListJobs(payload) => {
506 if let Some(s) = state.as_ref() {
511 let jobs_list =
512 jobs.list_for_session(&s.session_id, payload.filter.as_ref());
513 let response =
514 MessageType::SessionJobs(arcp_core::messages::SessionJobsPayload {
515 request_id: envelope.id.to_string(),
516 jobs: jobs_list,
517 next_cursor: None,
518 });
519 let mut env = Envelope::new(response);
520 env.correlation_id = Some(envelope.id.clone());
521 env.session_id = Some(s.session_id.clone());
522 let _ = out_tx.send(env).await;
523 }
524 }
525 MessageType::Subscribe(payload) => {
526 if let Some(s) = state.as_ref() {
527 Self::handle_subscribe(
528 &out_tx,
529 &self.inner.subscriptions,
530 &connection_subs,
531 envelope.id.clone(),
532 s.session_id.clone(),
533 payload,
534 )
535 .await;
536 }
537 }
538 MessageType::Unsubscribe(UnsubscribePayload { subscription_id }) => {
539 if let Some((_, join)) = connection_subs.remove(&subscription_id) {
540 join.abort();
541 }
542 let _ = self.inner.subscriptions.unsubscribe(&subscription_id);
543 }
544 MessageType::JobSubscribe(payload) => {
545 if let Some(s) = state.as_ref() {
546 Self::handle_job_subscribe(
547 &out_tx,
548 &self.inner.subscriptions,
549 &self.inner.jobs,
550 &self.inner.session_principals,
551 &connection_job_subs,
552 envelope.id.clone(),
553 s.session_id.clone(),
554 s.principal.clone(),
555 payload,
556 )
557 .await;
558 }
559 }
560 MessageType::JobUnsubscribe(JobUnsubscribePayload { job_id }) => {
561 if let Some((_, join)) = connection_job_subs.remove(&job_id) {
562 join.abort();
563 }
564 }
565 MessageType::ArtifactPut(payload) => {
566 if let Some(s) = state.as_ref() {
567 Self::handle_artifact_put(
568 &out_tx,
569 &self.inner.artifacts,
570 envelope.id.clone(),
571 s.session_id.clone(),
572 payload,
573 )
574 .await;
575 }
576 }
577 MessageType::ArtifactFetch(payload) => {
578 if let Some(s) = state.as_ref() {
579 Self::handle_artifact_fetch(
580 &out_tx,
581 &self.inner.artifacts,
582 envelope.id.clone(),
583 s.session_id.clone(),
584 payload,
585 )
586 .await;
587 }
588 }
589 MessageType::ArtifactRelease(ArtifactReleasePayload { artifact_id }) => {
590 self.inner.artifacts.release(&artifact_id);
591 }
592 _ if in_handshake => {
593 tracing::warn!(
594 type_name = envelope.payload.type_name(),
595 "unexpected handshake message direction",
596 );
597 }
598 _ => {
599 tracing::debug!(
600 type_name = envelope.payload.type_name(),
601 "dispatch arm not yet implemented",
602 );
603 }
604 }
605 };
606
607 if explicit_close {
612 if let Some(s) = state.as_ref() {
613 for snap in jobs.list_for_session(&s.session_id, None) {
614 let _ = jobs.cancel(&snap.job_id);
615 }
616 }
617 }
618 if let Some(s) = state.as_ref() {
619 self.inner.session_principals.remove(&s.session_id);
620 }
621 for entry in connection_subs.iter() {
622 entry.value().abort();
623 }
624 connection_subs.clear();
625 for entry in connection_job_subs.iter() {
626 entry.value().abort();
627 }
628 connection_job_subs.clear();
629 drop(out_tx);
630 ack_notify.notify_waiters();
633 let _ = writer.await;
634 result
635 }
636
637 async fn handle_session_open(
638 &self,
639 out: &mpsc::Sender<Envelope>,
640 correlation_id: MessageId,
641 payload: SessionOpenPayload,
642 ) -> Result<SessionState, ARCPError> {
643 let SessionOpenPayload {
644 auth,
645 client,
646 capabilities: client_caps,
647 } = payload;
648
649 let negotiated = self.negotiate_capabilities(&client_caps);
650 let session_id = SessionId::new();
651 let mut state = SessionState::new(session_id.clone(), negotiated.clone());
652
653 let Some(authenticator) = self.inner.auth.get(&auth.scheme) else {
654 self.send_rejected(
655 out,
656 correlation_id,
657 ErrorCode::Unauthenticated,
658 format!("auth scheme {:?} not configured", auth.scheme),
659 )
660 .await;
661 state.phase = HandshakePhase::Closed;
662 return Ok(state);
663 };
664
665 let outcome = authenticator
666 .authenticate(&auth, &client, &negotiated)
667 .await?;
668
669 match outcome {
670 AuthOutcome::Accept { principal } => {
671 self.inner
672 .session_principals
673 .insert(session_id.clone(), Some(principal.clone()));
674 state.principal = Some(principal);
675 state.phase = HandshakePhase::Accepted;
676 self.send_accepted(out, correlation_id, &session_id, &negotiated)
677 .await;
678 }
679 AuthOutcome::Challenge { challenge } => {
680 state.active_challenge = Some(challenge.clone());
681 state.phase = HandshakePhase::Challenged;
682 let mut env = Envelope::new(MessageType::SessionChallenge(
683 arcp_core::messages::SessionChallengePayload {
684 challenge: challenge.clone(),
685 },
686 ));
687 env.correlation_id = Some(correlation_id);
688 env.session_id = Some(session_id);
689 let _ = out.send(env).await;
690 }
691 AuthOutcome::Reject { reason } => {
692 self.send_rejected(out, correlation_id, ErrorCode::Unauthenticated, reason)
693 .await;
694 state.phase = HandshakePhase::Closed;
695 }
696 }
697 Ok(state)
698 }
699
700 async fn handle_session_authenticate(
701 &self,
702 out: &mpsc::Sender<Envelope>,
703 correlation_id: MessageId,
704 state: &mut SessionState,
705 response: &str,
706 ) -> Result<(), ARCPError> {
707 let Some(challenge) = state.active_challenge.clone() else {
708 tracing::warn!("session.authenticate without active challenge; dropping");
709 return Ok(());
710 };
711 for scheme in [
712 arcp_core::messages::AuthScheme::Bearer,
713 arcp_core::messages::AuthScheme::SignedJwt,
714 ] {
715 let Some(authenticator) = self.inner.auth.get(&scheme) else {
716 continue;
717 };
718 let outcome = authenticator
719 .verify_challenge_response(&challenge, response)
720 .await?;
721 match outcome {
722 AuthOutcome::Accept { principal } => {
723 self.inner
724 .session_principals
725 .insert(state.session_id.clone(), Some(principal.clone()));
726 state.principal = Some(principal);
727 state.phase = HandshakePhase::Accepted;
728 state.active_challenge = None;
729 self.send_accepted(out, correlation_id, &state.session_id, &state.capabilities)
730 .await;
731 return Ok(());
732 }
733 AuthOutcome::Challenge { .. } | AuthOutcome::Reject { .. } => {}
734 }
735 }
736 let mut env = Envelope::new(MessageType::SessionUnauthenticated(
737 SessionUnauthenticatedPayload {
738 code: ErrorCode::Unauthenticated,
739 message: "challenge response did not validate".into(),
740 },
741 ));
742 env.correlation_id = Some(correlation_id);
743 env.session_id = Some(state.session_id.clone());
744 let _ = out.send(env).await;
745 Ok(())
746 }
747
748 #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
749 async fn spawn_tool_invoke(
750 &self,
751 out: &mpsc::Sender<Envelope>,
752 jobs: &JobRegistry,
753 correlation_id: MessageId,
754 session_id: SessionId,
755 principal: Option<String>,
756 idempotency_key: Option<IdempotencyKey>,
757 payload: ToolInvokePayload,
758 ) {
759 let idempotency_scope = idempotency_key.as_ref().map(|key| IdempotencyScope {
764 principal_or_session: principal.clone().unwrap_or_else(|| session_id.to_string()),
765 idempotency_key: key.clone(),
766 });
767 let canonical_args = serde_json::to_string(&payload.arguments).unwrap_or_default();
768 if let Some(scope) = idempotency_scope.as_ref() {
769 if let Some(record) = self.inner.idempotency_index.get(scope) {
770 if record.tool == payload.tool && record.arguments_canonical == canonical_args {
771 let mut accepted =
772 Envelope::new(MessageType::JobAccepted(record.accepted.clone()));
773 accepted.correlation_id = Some(correlation_id);
774 accepted.session_id = Some(session_id);
775 accepted.job_id = Some(record.accepted.job_id.clone());
776 accepted.idempotency_key = idempotency_key;
777 let _ = out.send(accepted).await;
778 return;
779 }
780 let mut err = Envelope::new(MessageType::JobFailed(JobFailedPayload {
781 code: ErrorCode::FailedPrecondition,
782 retryable: Some(false),
783 message: format!(
784 "idempotency key {} already bound to a different command intent",
785 scope.idempotency_key
786 ),
787 details: None,
788 }));
789 err.correlation_id = Some(correlation_id);
790 err.session_id = Some(session_id);
791 err.idempotency_key = idempotency_key;
792 let _ = out.send(err).await;
793 return;
794 }
795 }
796 let job_id = JobId::new();
797
798 let agent_ref = match arcp_core::messages::AgentRef::parse(&payload.tool) {
801 Ok(r) => r,
802 Err(e) => {
803 let mut err = Envelope::new(MessageType::JobFailed(JobFailedPayload {
804 code: ErrorCode::InvalidArgument,
805 retryable: Some(false),
806 message: format!("invalid agent reference {}: {e}", payload.tool),
807 details: None,
808 }));
809 err.correlation_id = Some(correlation_id);
810 err.session_id = Some(session_id);
811 err.job_id = Some(job_id);
812 let _ = out.send(err).await;
813 return;
814 }
815 };
816 let lease = effective_lease(&payload);
817 let defer_accepted = self.inner.credential_provisioner.is_some() && lease.is_some();
818 let accepted_sent = if defer_accepted {
819 false
820 } else {
821 let mut accepted = Envelope::new(MessageType::JobAccepted(JobAcceptedPayload {
822 job_id: job_id.clone(),
823 credentials: vec![],
824 lease: lease.clone(),
825 }));
826 accepted.correlation_id = Some(correlation_id.clone());
827 accepted.session_id = Some(session_id.clone());
828 accepted.job_id = Some(job_id.clone());
829 let _ = out.send(accepted).await;
830 true
831 };
832
833 if agent_ref.version.is_some() {
836 let advertised = &self.inner.advertised_capabilities.agents;
837 let satisfied = advertised
838 .as_ref()
839 .is_some_and(|inv| inv.satisfies(&agent_ref));
840 if !satisfied {
841 let mut err = Envelope::new(MessageType::JobFailed(JobFailedPayload {
842 code: ErrorCode::AgentVersionNotAvailable,
843 retryable: Some(false),
844 message: format!("agent version not available: {}", agent_ref.format()),
845 details: None,
846 }));
847 err.correlation_id = Some(correlation_id);
848 err.session_id = Some(session_id);
849 err.job_id = Some(job_id);
850 let _ = out.send(err).await;
851 return;
852 }
853 }
854
855 let Some(handler) = self.inner.tools.get(&agent_ref.name) else {
856 let mut err = Envelope::new(MessageType::JobFailed(JobFailedPayload {
857 code: ErrorCode::NotFound,
858 retryable: Some(false),
859 message: format!("tool not registered: {}", agent_ref.name),
860 details: None,
861 }));
862 err.correlation_id = Some(correlation_id);
863 err.session_id = Some(session_id);
864 err.job_id = Some(job_id);
865 let _ = out.send(err).await;
866 return;
867 };
868
869 let credentials = if let (Some(provisioner), Some(lease_ref)) =
870 (&self.inner.credential_provisioner, lease.as_ref())
871 {
872 let ctx = CredentialJobContext {
873 job_id: job_id.clone(),
874 session_id: session_id.clone(),
875 principal: principal.clone(),
876 parent_job_id: None,
877 };
878 match provisioner.issue(lease_ref, &ctx).await {
879 Ok(credentials) => {
880 self.inner
881 .credential_ledger
882 .record_issued(&job_id, &credentials);
883 credentials
884 }
885 Err(e) => {
886 let mut err = Envelope::new(MessageType::JobFailed(JobFailedPayload {
887 code: e.code(),
888 retryable: Some(e.retryable()),
889 message: e.to_string(),
890 details: None,
891 }));
892 err.correlation_id = Some(correlation_id);
893 err.session_id = Some(session_id);
894 err.job_id = Some(job_id);
895 let _ = out.send(err).await;
896 return;
897 }
898 }
899 } else {
900 Vec::new()
901 };
902
903 if !accepted_sent {
905 let mut accepted = Envelope::new(MessageType::JobAccepted(JobAcceptedPayload {
906 job_id: job_id.clone(),
907 credentials: credentials.clone(),
908 lease: lease.clone(),
909 }));
910 accepted.correlation_id = Some(correlation_id.clone());
911 accepted.session_id = Some(session_id.clone());
912 accepted.job_id = Some(job_id.clone());
913 let _ = out.send(accepted).await;
914 }
915
916 let cancel = CancellationToken::new();
921 let entry = JobEntry {
922 job_id: job_id.clone(),
923 session_id: session_id.clone(),
924 correlation_id: correlation_id.clone(),
925 cancel: cancel.clone(),
926 state: JobState::Accepted,
927 agent: agent_ref.format(),
929 created_at: chrono::Utc::now(),
930 last_event_seq: 0,
931 parent_job_id: None,
932 credential_ids: self.inner.credential_ledger.outstanding_for_job(&job_id),
933 lease: lease.clone(),
934 };
935
936 let out_clone = out.clone();
937 let jobs_clone = jobs.clone();
938 let provisioner_clone = self.inner.credential_provisioner.clone();
939 let credential_ledger_clone = self.inner.credential_ledger.clone();
940 let cancel_for_task = cancel;
941 let budget_tracker = lease
945 .as_ref()
946 .and_then(|lease| lease.cost_budget.as_ref())
947 .map_or_else(crate::runtime::context::BudgetTracker::new, |budget| {
948 crate::runtime::context::BudgetTracker::from_budget(budget)
949 });
950
951 if let Some(scope) = idempotency_scope {
955 self.inner.idempotency_index.insert(
956 scope,
957 IdempotencyRecord {
958 accepted: JobAcceptedPayload {
959 job_id: job_id.clone(),
960 credentials: credentials.clone(),
961 lease: lease.clone(),
962 },
963 tool: agent_ref.format(),
964 arguments_canonical: canonical_args,
965 },
966 );
967 }
968
969 let join = tokio::spawn(async move {
970 let mut started = Envelope::new(MessageType::JobStarted(JobStartedPayload {
972 description: Some(format!("invoking {}", payload.tool)),
973 }));
974 started.correlation_id = Some(correlation_id.clone());
975 started.session_id = Some(session_id.clone());
976 started.job_id = Some(job_id.clone());
977 let _ = out_clone.send(started).await;
978 jobs_clone.set_state(&job_id, JobState::Running);
979
980 let ctx = ToolContext {
981 cancel: cancel_for_task.clone(),
982 job_id: job_id.clone(),
983 session_id: session_id.clone(),
984 correlation_id: correlation_id.clone(),
985 out: out_clone.clone(),
986 budget: budget_tracker,
987 lease,
988 };
989
990 let outcome = tokio::select! {
991 () = cancel_for_task.cancelled() => Outcome::Cancelled("cancellation token fired".into()),
992 result = handler.invoke(payload.arguments, ctx) => match result {
993 Ok(value) => Outcome::Completed(value),
994 Err(ARCPError::Cancelled { reason }) => Outcome::Cancelled(reason),
995 Err(e) => Outcome::Failed(e),
996 },
997 };
998
999 let terminal = match outcome {
1000 Outcome::Completed(value) => {
1001 jobs_clone.set_state(&job_id, JobState::Completed);
1002 let completed = streamed_result_from_value(value);
1009 MessageType::JobCompleted(completed)
1010 }
1011 Outcome::Failed(e) => {
1012 jobs_clone.set_state(&job_id, JobState::Failed);
1013 MessageType::JobFailed(JobFailedPayload {
1014 code: e.code(),
1015 retryable: Some(e.retryable()),
1016 message: e.to_string(),
1017 details: None,
1018 })
1019 }
1020 Outcome::Cancelled(reason) => {
1021 jobs_clone.set_state(&job_id, JobState::Cancelled);
1022 MessageType::JobCancelled(JobCancelledPayload {
1023 reason: Some(reason),
1024 })
1025 }
1026 };
1027 let mut term = Envelope::new(terminal);
1028 term.correlation_id = Some(correlation_id);
1029 term.session_id = Some(session_id);
1030 term.job_id = Some(job_id.clone());
1031 let _ = out_clone.send(term).await;
1032 if let Some(provisioner) = provisioner_clone.as_ref() {
1033 if let Err(e) =
1034 revoke_all_for_job(&credential_ledger_clone, provisioner, &job_id).await
1035 {
1036 tracing::warn!(error = %e, job_id = %job_id, "failed to revoke credentials");
1037 }
1038 }
1039 });
1040
1041 jobs.insert(entry, join);
1042 }
1043
1044 async fn handle_cancel(
1045 &self,
1046 out: &mpsc::Sender<Envelope>,
1047 jobs: &JobRegistry,
1048 correlation_id: MessageId,
1049 requester: &SessionState,
1050 payload: &CancelPayload,
1051 ) {
1052 let CancelPayload {
1053 target, target_id, ..
1054 } = payload;
1055 match target {
1056 CancelTargetKind::Job => {
1057 #[allow(clippy::option_if_let_else)] let response_payload = if let Ok(job_id) = target_id.parse::<JobId>() {
1059 if let Some(snap) = jobs.snapshot(&job_id) {
1060 let authorized = snap.session_id == requester.session_id
1066 || cancel_principal_matches(
1067 &self.inner.session_principals,
1068 &snap.session_id,
1069 requester.principal.as_deref(),
1070 );
1071 if authorized {
1072 if jobs.cancel(&job_id) {
1073 MessageType::CancelAccepted(
1074 arcp_core::messages::CancelAcceptedPayload {
1075 target_id: Some(target_id.clone()),
1076 },
1077 )
1078 } else {
1079 MessageType::CancelRefused(
1080 arcp_core::messages::CancelRefusedPayload {
1081 target_id: target_id.clone(),
1082 reason: "job is no longer in-flight".into(),
1083 },
1084 )
1085 }
1086 } else {
1087 MessageType::CancelRefused(arcp_core::messages::CancelRefusedPayload {
1088 target_id: target_id.clone(),
1089 reason: "permission denied: not authorized to cancel this job"
1090 .into(),
1091 })
1092 }
1093 } else {
1094 MessageType::CancelRefused(arcp_core::messages::CancelRefusedPayload {
1095 target_id: target_id.clone(),
1096 reason: "no such in-flight job".into(),
1097 })
1098 }
1099 } else {
1100 MessageType::CancelRefused(arcp_core::messages::CancelRefusedPayload {
1101 target_id: target_id.clone(),
1102 reason: "malformed job id".into(),
1103 })
1104 };
1105 let mut env = Envelope::new(response_payload);
1106 env.correlation_id = Some(correlation_id);
1107 env.session_id = Some(requester.session_id.clone());
1108 let _ = out.send(env).await;
1109 }
1110 CancelTargetKind::Stream | CancelTargetKind::Session => {
1111 tracing::warn!(?target, "cancel target not yet implemented");
1112 }
1113 }
1114 }
1115
1116 fn negotiate_capabilities(&self, client_caps: &Capabilities) -> Capabilities {
1117 let runtime_caps = &self.inner.advertised_capabilities;
1119 Capabilities {
1120 streaming: intersect_bool(runtime_caps.streaming, client_caps.streaming),
1121 durable_jobs: intersect_bool(runtime_caps.durable_jobs, client_caps.durable_jobs),
1122 checkpoints: intersect_bool(runtime_caps.checkpoints, client_caps.checkpoints),
1123 binary_streams: intersect_bool(runtime_caps.binary_streams, client_caps.binary_streams),
1124 agent_handoff: intersect_bool(runtime_caps.agent_handoff, client_caps.agent_handoff),
1125 model_use: intersect_bool(runtime_caps.model_use, client_caps.model_use),
1126 provisioned_credentials: intersect_bool(
1127 runtime_caps.provisioned_credentials,
1128 client_caps.provisioned_credentials,
1129 ),
1130 artifacts: intersect_bool(runtime_caps.artifacts, client_caps.artifacts),
1131 subscriptions: intersect_bool(runtime_caps.subscriptions, client_caps.subscriptions),
1132 scheduled_jobs: intersect_bool(runtime_caps.scheduled_jobs, client_caps.scheduled_jobs),
1133 interrupt: intersect_bool(runtime_caps.interrupt, client_caps.interrupt),
1134 anonymous: intersect_bool(runtime_caps.anonymous, client_caps.anonymous),
1135 heartbeat_recovery: runtime_caps.heartbeat_recovery.clone(),
1136 binary_encoding: runtime_caps.binary_encoding.clone(),
1137 extensions: runtime_caps
1138 .extensions
1139 .iter()
1140 .filter(|e| client_caps.extensions.contains(e))
1141 .cloned()
1142 .collect(),
1143 artifact_retention: runtime_caps.artifact_retention.clone(),
1144 agents: runtime_caps.agents.clone(),
1149 extra: std::collections::BTreeMap::new(),
1150 }
1151 }
1152
1153 async fn send_accepted(
1154 &self,
1155 out: &mpsc::Sender<Envelope>,
1156 correlation_id: MessageId,
1157 session_id: &SessionId,
1158 capabilities: &Capabilities,
1159 ) {
1160 let lease = self.inner.session_lease_seconds.map(|s| SessionLease {
1161 expires_at: chrono::Utc::now()
1162 + chrono::Duration::seconds(i64::try_from(s).unwrap_or(i64::MAX)),
1163 });
1164 let mut env = Envelope::new(MessageType::SessionAccepted(SessionAcceptedPayload {
1165 session_id: session_id.clone(),
1166 runtime: self.inner.runtime_identity.clone(),
1167 capabilities: capabilities.clone(),
1168 lease,
1169 }));
1170 env.correlation_id = Some(correlation_id);
1171 env.session_id = Some(session_id.clone());
1172 let _ = out.send(env).await;
1173 }
1174
1175 async fn send_rejected(
1176 &self,
1177 out: &mpsc::Sender<Envelope>,
1178 correlation_id: MessageId,
1179 code: ErrorCode,
1180 message: String,
1181 ) {
1182 let mut env = Envelope::new(MessageType::SessionRejected(SessionRejectedPayload {
1183 code,
1184 message,
1185 }));
1186 env.correlation_id = Some(correlation_id);
1187 let _ = out.send(env).await;
1188 }
1189
1190 async fn handle_subscribe(
1191 out: &mpsc::Sender<Envelope>,
1192 manager: &SubscriptionManager,
1193 connection_subs: &Arc<DashMap<SubscriptionId, JoinHandle<()>>>,
1194 correlation_id: MessageId,
1195 session_id: SessionId,
1196 payload: SubscribePayload,
1197 ) {
1198 let SubscribePayload { filter, since: _ } = payload;
1199 let (subscription_id, mut rx) = manager.register(filter, session_id.clone());
1202 let mut accepted =
1204 Envelope::new(MessageType::SubscribeAccepted(SubscribeAcceptedPayload {
1205 subscription_id: subscription_id.clone(),
1206 }));
1207 accepted.correlation_id = Some(correlation_id);
1208 accepted.session_id = Some(session_id);
1209 accepted.subscription_id = Some(subscription_id.clone());
1210 let _ = out.send(accepted).await;
1211
1212 let out_clone = out.clone();
1216 let sub_id = subscription_id.clone();
1217 let join = tokio::spawn(async move {
1218 while let Some(event) = rx.next().await {
1219 let value = match serde_json::to_value(&event) {
1220 Ok(v) => v,
1221 Err(e) => {
1222 tracing::warn!(error = %e, "subscribe.event serialise failed");
1223 continue;
1224 }
1225 };
1226 let mut wrapper =
1227 Envelope::new(MessageType::SubscribeEvent(SubscribeEventPayload {
1228 event: value,
1229 }));
1230 wrapper.subscription_id = Some(sub_id.clone());
1231 if out_clone.send(wrapper).await.is_err() {
1232 break;
1233 }
1234 }
1235 });
1236 connection_subs.insert(subscription_id, join);
1237 }
1238
1239 #[allow(clippy::too_many_arguments)]
1240 async fn handle_job_subscribe(
1241 out: &mpsc::Sender<Envelope>,
1242 manager: &SubscriptionManager,
1243 jobs: &JobRegistry,
1244 session_principals: &DashMap<SessionId, Option<String>>,
1245 connection_job_subs: &Arc<DashMap<JobId, JoinHandle<()>>>,
1246 correlation_id: MessageId,
1247 subscriber_session: SessionId,
1248 subscriber_principal: Option<String>,
1249 payload: JobSubscribePayload,
1250 ) {
1251 let JobSubscribePayload {
1252 job_id,
1253 from_event_seq: _,
1254 history: _,
1255 } = payload;
1256
1257 let Some(snap) = jobs.snapshot(&job_id) else {
1258 let mut err = Envelope::new(MessageType::Nack(NackPayload {
1259 code: ErrorCode::NotFound,
1260 message: format!("no such job: {job_id}"),
1261 details: None,
1262 }));
1263 err.correlation_id = Some(correlation_id);
1264 err.session_id = Some(subscriber_session);
1265 let _ = out.send(err).await;
1266 return;
1267 };
1268
1269 if snap.session_id != subscriber_session {
1273 let submitter_principal = session_principals
1274 .get(&snap.session_id)
1275 .and_then(|p| p.value().clone());
1276 let permitted = match (&submitter_principal, &subscriber_principal) {
1277 (Some(a), Some(b)) => a == b,
1278 _ => false,
1279 };
1280 if !permitted {
1281 let mut err = Envelope::new(MessageType::Nack(NackPayload {
1282 code: ErrorCode::PermissionDenied,
1283 message: "principal not authorized to subscribe to this job".into(),
1284 details: None,
1285 }));
1286 err.correlation_id = Some(correlation_id);
1287 err.session_id = Some(subscriber_session);
1288 err.job_id = Some(job_id);
1289 let _ = out.send(err).await;
1290 return;
1291 }
1292 }
1293
1294 let filter = arcp_core::messages::SubscriptionFilter {
1296 job_id: vec![job_id.clone()],
1297 ..arcp_core::messages::SubscriptionFilter::default()
1298 };
1299 let (_internal_id, mut rx) = manager.register(filter, subscriber_session.clone());
1300
1301 let ack = JobSubscribedPayload {
1303 job_id: job_id.clone(),
1304 current_status: snap.state.wire_str().to_owned(),
1305 agent: snap.agent.clone(),
1306 parent_job_id: snap.parent_job_id.clone(),
1307 trace_id: None,
1308 subscribed_from: snap.last_event_seq,
1309 replayed: false,
1313 };
1314 let mut ack_env = Envelope::new(MessageType::JobSubscribed(ack));
1315 ack_env.correlation_id = Some(correlation_id);
1316 ack_env.session_id = Some(subscriber_session.clone());
1317 ack_env.job_id = Some(job_id.clone());
1318 let _ = out.send(ack_env).await;
1319
1320 let out_clone = out.clone();
1325 let subscriber_session_clone = subscriber_session;
1326 let job_id_clone = job_id.clone();
1327 let connection_job_subs_clone = Arc::clone(connection_job_subs);
1328 let join = tokio::spawn(async move {
1329 while let Some(mut env) = rx.next().await {
1330 if !is_forwardable_job_event(&env.payload) {
1334 continue;
1335 }
1336 env.session_id = Some(subscriber_session_clone.clone());
1337 if out_clone.send(env).await.is_err() {
1338 break;
1339 }
1340 }
1341 connection_job_subs_clone.remove(&job_id_clone);
1343 });
1344 connection_job_subs.insert(job_id, join);
1345 }
1346
1347 async fn handle_artifact_put(
1348 out: &mpsc::Sender<Envelope>,
1349 store: &ArtifactStore,
1350 correlation_id: MessageId,
1351 session_id: SessionId,
1352 payload: ArtifactPutPayload,
1353 ) {
1354 let ArtifactPutPayload {
1355 media_type,
1356 data,
1357 sha256,
1358 retain_seconds,
1359 } = payload;
1360 let mut env = match store.put(media_type, &data, retain_seconds, sha256) {
1361 Ok(reference) => Envelope::new(MessageType::ArtifactRef(ArtifactRefPayload {
1362 artifact: reference,
1363 })),
1364 Err(e) => Envelope::new(MessageType::Nack(NackPayload {
1365 code: e.code(),
1366 message: e.to_string(),
1367 details: None,
1368 })),
1369 };
1370 env.correlation_id = Some(correlation_id);
1371 env.session_id = Some(session_id);
1372 let _ = out.send(env).await;
1373 }
1374
1375 async fn handle_artifact_fetch(
1376 out: &mpsc::Sender<Envelope>,
1377 store: &ArtifactStore,
1378 correlation_id: MessageId,
1379 session_id: SessionId,
1380 payload: ArtifactFetchPayload,
1381 ) {
1382 let ArtifactFetchPayload { artifact_id } = payload;
1383 let mut env = match store.fetch(&artifact_id) {
1384 Ok((data, media_type)) => Envelope::new(MessageType::ArtifactPut(ArtifactPutPayload {
1385 media_type,
1386 data,
1387 sha256: None,
1388 retain_seconds: None,
1389 })),
1390 Err(e) => Envelope::new(MessageType::Nack(NackPayload {
1391 code: e.code(),
1392 message: e.to_string(),
1393 details: None,
1394 })),
1395 };
1396 env.correlation_id = Some(correlation_id);
1397 env.session_id = Some(session_id);
1398 let _ = out.send(env).await;
1399 }
1400}
1401
1402enum Outcome {
1403 Completed(serde_json::Value),
1404 Failed(ARCPError),
1405 Cancelled(String),
1406}
1407
1408fn effective_lease(payload: &ToolInvokePayload) -> Option<LeaseRequest> {
1409 if let Some(lease) = payload.lease_request.clone() {
1410 return Some(lease);
1411 }
1412 payload.cost_budget.clone().map(|cost_budget| LeaseRequest {
1413 cost_budget: Some(cost_budget),
1414 model_use: None,
1415 expires_at: None,
1416 extra: std::collections::BTreeMap::new(),
1417 })
1418}
1419
1420fn redact_for_subscribers(env: &Envelope) -> Envelope {
1421 let mut out = env.clone();
1422 if let MessageType::JobAccepted(payload) = &mut out.payload {
1423 payload.credentials.clear();
1424 }
1425 out
1426}
1427
1428pub const STREAMED_RESULT_SENTINEL: &str = "$arcp_streamed_result";
1435
1436fn streamed_result_from_value(value: serde_json::Value) -> JobCompletedPayload {
1439 if let Some(obj) = value.as_object() {
1440 if obj.len() == 1 {
1441 if let Some(inner) = obj.get(STREAMED_RESULT_SENTINEL) {
1442 let result_id = inner
1443 .get("result_id")
1444 .and_then(|v| v.as_str())
1445 .map(String::from);
1446 let result_size = inner.get("result_size").and_then(serde_json::Value::as_u64);
1447 let summary = inner
1448 .get("summary")
1449 .and_then(|v| v.as_str())
1450 .map(String::from);
1451 if result_id.is_some() {
1452 return JobCompletedPayload {
1453 value: None,
1454 result_ref: None,
1455 result_id,
1456 result_size,
1457 summary,
1458 };
1459 }
1460 }
1461 }
1462 }
1463 JobCompletedPayload {
1464 value: Some(value),
1465 result_ref: None,
1466 result_id: None,
1467 result_size: None,
1468 summary: None,
1469 }
1470}
1471
1472const fn is_forwardable_job_event(payload: &MessageType) -> bool {
1478 matches!(
1479 payload,
1480 MessageType::JobAccepted(_)
1481 | MessageType::JobStarted(_)
1482 | MessageType::JobProgress(_)
1483 | MessageType::JobHeartbeat(_)
1484 | MessageType::JobCompleted(_)
1485 | MessageType::JobFailed(_)
1486 | MessageType::JobCancelled(_)
1487 | MessageType::JobResultChunk(_)
1488 | MessageType::ToolResult(_)
1489 | MessageType::ToolError(_)
1490 | MessageType::Log(_)
1491 | MessageType::Metric(_)
1492 | MessageType::StreamOpen(_)
1493 | MessageType::StreamChunk(_)
1494 | MessageType::StreamClose(_)
1495 | MessageType::StreamError(_)
1496 | MessageType::ArtifactRef(_)
1497 )
1498}
1499
1500fn cancel_principal_matches(
1505 session_principals: &DashMap<SessionId, Option<String>>,
1506 owning_session: &SessionId,
1507 requester_principal: Option<&str>,
1508) -> bool {
1509 let Some(requester_principal) = requester_principal else {
1510 return false;
1511 };
1512 session_principals
1513 .get(owning_session)
1514 .and_then(|p| p.value().clone())
1515 .is_some_and(|owner| owner == requester_principal)
1516}
1517
1518const fn intersect_bool(a: Option<bool>, b: Option<bool>) -> Option<bool> {
1525 match (a, b) {
1526 (Some(true), Some(true)) => Some(true),
1527 (Some(_), _) | (_, Some(_)) => Some(false),
1528 (None, None) => None,
1529 }
1530}