Skip to main content

arcp_runtime/runtime/
server.rs

1//! ARCP runtime — the server side that drives the handshake (RFC §8.1)
2//! and dispatches subsequent envelopes.
3
4use std::collections::HashSet;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7
8use dashmap::DashMap;
9use tokio::sync::{mpsc, Notify};
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use super::artifact::ArtifactStore;
14use super::context::ToolContext;
15use super::credentials::{
16    revoke_all_for_job, CredentialJobContext, CredentialLedger, CredentialProvisioner,
17};
18use super::job::{JobEntry, JobRegistry};
19use super::session::{HandshakePhase, SessionState};
20use super::subscription::SubscriptionManager;
21use super::tools::ToolRegistry;
22use crate::store::eventlog::EventLog;
23use arcp_core::auth::{AuthOutcome, AuthRegistry, Authenticator};
24use arcp_core::envelope::Envelope;
25use arcp_core::error::{ARCPError, ErrorCode};
26use arcp_core::extensions::ExtensionRegistry;
27use arcp_core::ids::IdempotencyKey;
28use arcp_core::ids::SubscriptionId;
29use arcp_core::ids::{JobId, MessageId, SessionId};
30use arcp_core::messages::{
31    ArtifactFetchPayload, ArtifactPutPayload, ArtifactRefPayload, ArtifactReleasePayload,
32    CancelPayload, CancelTargetKind, Capabilities, JobAcceptedPayload, JobCancelledPayload,
33    JobCompletedPayload, JobFailedPayload, JobStartedPayload, JobState, JobSubscribePayload,
34    JobSubscribedPayload, JobUnsubscribePayload, LeaseRequest, MessageType, NackPayload,
35    RuntimeIdentity, SessionAcceptedPayload, SessionLease, SessionOpenPayload,
36    SessionRejectedPayload, SessionUnauthenticatedPayload, SubscribeAcceptedPayload,
37    SubscribeEventPayload, SubscribePayload, ToolInvokePayload, UnsubscribePayload,
38};
39use arcp_core::transport::Transport;
40use arcp_core::{IMPL_KIND, IMPL_VERSION};
41
42/// Runtime configuration.
43pub struct RuntimeBuilder {
44    auth: AuthRegistry,
45    tools: ToolRegistry,
46    advertised_capabilities: Capabilities,
47    runtime_identity: RuntimeIdentity,
48    session_lease_seconds: Option<u64>,
49    ack_window: Option<u64>,
50    credential_provisioner: Option<Arc<dyn CredentialProvisioner>>,
51}
52
53impl Default for RuntimeBuilder {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59impl std::fmt::Debug for RuntimeBuilder {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        f.debug_struct("RuntimeBuilder")
62            .field("advertised_capabilities", &self.advertised_capabilities)
63            .field("runtime_identity", &self.runtime_identity)
64            .field("session_lease_seconds", &self.session_lease_seconds)
65            .finish_non_exhaustive()
66    }
67}
68
69impl RuntimeBuilder {
70    /// New builder with empty auth registry, default capabilities, and the
71    /// crate's identity (`arcp-rs`).
72    #[must_use]
73    pub fn new() -> Self {
74        Self {
75            auth: AuthRegistry::new(),
76            tools: ToolRegistry::new(),
77            advertised_capabilities: Capabilities::default(),
78            runtime_identity: RuntimeIdentity {
79                kind: IMPL_KIND.to_owned(),
80                version: IMPL_VERSION.to_owned(),
81                fingerprint: None,
82                trust_level: Some("trusted".into()),
83            },
84            session_lease_seconds: Some(3600),
85            ack_window: None,
86            credential_provisioner: None,
87        }
88    }
89
90    /// Register one authenticator. Multiple may be added (one per scheme).
91    #[must_use]
92    pub fn with_authenticator(mut self, auth: Box<dyn Authenticator>) -> Self {
93        self.auth.register(auth);
94        self
95    }
96
97    /// Set the tool registry (replaces any previously set).
98    #[must_use]
99    pub fn with_tools(mut self, tools: ToolRegistry) -> Self {
100        self.tools = tools;
101        self
102    }
103
104    /// Set the capability set the runtime advertises.
105    #[must_use]
106    pub fn with_capabilities(mut self, caps: Capabilities) -> Self {
107        self.advertised_capabilities = caps;
108        self
109    }
110
111    /// Override the runtime identity.
112    #[must_use]
113    pub fn with_identity(mut self, ident: RuntimeIdentity) -> Self {
114        self.runtime_identity = ident;
115        self
116    }
117
118    /// Override the default session lease length.
119    #[must_use]
120    pub const fn with_session_lease_seconds(mut self, seconds: u64) -> Self {
121        self.session_lease_seconds = Some(seconds);
122        self
123    }
124
125    /// Set the size of the `session.ack` sliding window (ARCP v1.1 §6.5).
126    ///
127    /// When set, the writer will pause outbound countable envelopes once
128    /// `emitted - last_processed_seq >= window` and resume on the next
129    /// `session.ack`. Set to `None` (default) to disable window-based
130    /// flow control entirely.
131    ///
132    /// A window of `0` makes the gate immediately unsatisfiable for the
133    /// very first countable event and is normalized to `None`
134    /// (disabled) rather than installing a guaranteed deadlock.
135    #[must_use]
136    pub const fn with_ack_window(mut self, window: u64) -> Self {
137        self.ack_window = if window == 0 { None } else { Some(window) };
138        self
139    }
140
141    /// Register a provisioner for ARCP v1.1 lease-bound credentials.
142    #[must_use]
143    pub fn with_credential_provisioner(
144        mut self,
145        provisioner: Arc<dyn CredentialProvisioner>,
146    ) -> Self {
147        self.credential_provisioner = Some(provisioner);
148        self.advertised_capabilities.model_use = Some(true);
149        self.advertised_capabilities.provisioned_credentials = Some(true);
150        self
151    }
152
153    /// Construct an [`ARCPRuntime`] sharing this configuration. The
154    /// returned runtime is cheap to clone.
155    ///
156    /// # Errors
157    ///
158    /// Returns [`ARCPError::Storage`] if the in-memory event log cannot be
159    /// initialised (extremely unlikely; signals `SQLite` link failure).
160    pub async fn build(self) -> Result<ARCPRuntime, ARCPError> {
161        if self.advertised_capabilities.provisioned_credentials == Some(true)
162            && self.credential_provisioner.is_none()
163        {
164            return Err(ARCPError::FailedPrecondition {
165                detail: "provisioned_credentials advertised without a CredentialProvisioner".into(),
166            });
167        }
168        let event_log = EventLog::in_memory().await?;
169        Ok(ARCPRuntime {
170            inner: Arc::new(RuntimeInner {
171                auth: self.auth,
172                tools: self.tools,
173                advertised_capabilities: self.advertised_capabilities,
174                runtime_identity: self.runtime_identity,
175                session_lease_seconds: self.session_lease_seconds,
176                ack_window: self.ack_window,
177                extension_registry: ExtensionRegistry::new(),
178                event_log,
179                artifacts: ArtifactStore::new(),
180                subscriptions: SubscriptionManager::new(),
181                jobs: JobRegistry::new(),
182                session_principals: DashMap::new(),
183                credential_provisioner: self.credential_provisioner,
184                credential_ledger: CredentialLedger::new(),
185                idempotency_index: DashMap::new(),
186            }),
187        })
188    }
189}
190
191struct RuntimeInner {
192    auth: AuthRegistry,
193    tools: ToolRegistry,
194    advertised_capabilities: Capabilities,
195    runtime_identity: RuntimeIdentity,
196    session_lease_seconds: Option<u64>,
197    /// Size of the `session.ack` sliding window, in countable events.
198    /// `None` disables window-based flow control (default).
199    ack_window: Option<u64>,
200    #[allow(dead_code)]
201    extension_registry: ExtensionRegistry,
202    event_log: EventLog,
203    artifacts: ArtifactStore,
204    subscriptions: SubscriptionManager,
205    /// Runtime-wide job registry. Shared across connections so a
206    /// `job.subscribe` (ARCP v1.1 §7.6) from a different session can
207    /// observe jobs submitted elsewhere.
208    jobs: JobRegistry,
209    /// Per-session authenticated principal. Used by `job.subscribe`
210    /// authorization (default policy: same-principal as the submitter).
211    session_principals: DashMap<SessionId, Option<String>>,
212    /// Optional provisioner for lease-bound upstream credentials.
213    credential_provisioner: Option<Arc<dyn CredentialProvisioner>>,
214    /// Runtime ledger of outstanding credential ids.
215    credential_ledger: CredentialLedger,
216    /// Logical idempotency index for `tool.invoke` (ARCP v1.1 §6.4).
217    /// Keyed by `(principal-or-session, idempotency_key)`; resolves a
218    /// repeat command intent to the original `JobAccepted` payload so
219    /// retries return the same `job_id` instead of starting a duplicate
220    /// job.
221    idempotency_index: DashMap<IdempotencyScope, IdempotencyRecord>,
222}
223
224/// Scope key for logical idempotency. Authenticated requests scope by
225/// principal so a retry across a reconnect resolves to the same job;
226/// anonymous sessions fall back to the session id.
227#[derive(Debug, Clone, PartialEq, Eq, Hash)]
228struct IdempotencyScope {
229    principal_or_session: String,
230    idempotency_key: IdempotencyKey,
231}
232
233#[derive(Debug, Clone)]
234struct IdempotencyRecord {
235    accepted: JobAcceptedPayload,
236    tool: String,
237    arguments_canonical: String,
238}
239
240/// The ARCP runtime. Cheap to clone; share across tasks.
241#[derive(Clone)]
242pub struct ARCPRuntime {
243    inner: Arc<RuntimeInner>,
244}
245
246impl std::fmt::Debug for ARCPRuntime {
247    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248        f.debug_struct("ARCPRuntime").finish_non_exhaustive()
249    }
250}
251
252impl ARCPRuntime {
253    /// Construct via [`RuntimeBuilder`].
254    #[must_use]
255    pub fn builder() -> RuntimeBuilder {
256        RuntimeBuilder::new()
257    }
258
259    /// Borrow the runtime's event log.
260    #[must_use]
261    pub fn event_log(&self) -> &EventLog {
262        &self.inner.event_log
263    }
264
265    /// Borrow the runtime's artifact store.
266    #[must_use]
267    pub fn artifacts(&self) -> &ArtifactStore {
268        &self.inner.artifacts
269    }
270
271    /// Borrow the runtime's subscription manager.
272    #[must_use]
273    pub fn subscriptions(&self) -> &SubscriptionManager {
274        &self.inner.subscriptions
275    }
276
277    /// Spawn a per-connection task that drives the handshake and then
278    /// dispatches subsequent envelopes. The returned [`JoinHandle`] is
279    /// owned by the caller — Phase 2 doesn't yet integrate with a
280    /// connection registry.
281    #[must_use]
282    pub fn serve_connection<T: Transport + 'static>(&self, transport: T) -> JoinHandle<()> {
283        let runtime = self.clone();
284        tokio::spawn(async move {
285            if let Err(e) = runtime.run_connection(transport).await {
286                tracing::warn!(error = %e, "connection terminated with error");
287            }
288        })
289    }
290
291    /// Drive one connection synchronously (in the caller's task).
292    ///
293    /// # Errors
294    ///
295    /// Returns [`ARCPError`] for transport / serialisation failures or for
296    /// internal protocol errors (rare).
297    #[allow(clippy::too_many_lines, clippy::cognitive_complexity)]
298    pub async fn run_connection<T: Transport + 'static>(
299        &self,
300        transport: T,
301    ) -> Result<(), ARCPError> {
302        let transport = Arc::new(transport);
303        // Out-going envelope channel — both the dispatcher and per-job
304        // tasks publish here. A dedicated writer task owns the transport
305        // send side so we never have two callers contending on it.
306        let (out_tx, mut out_rx) = mpsc::channel::<Envelope>(256);
307        let writer_transport = Arc::clone(&transport);
308        let event_log = self.inner.event_log.clone();
309        let writer_subs = self.inner.subscriptions.clone();
310        // ARCP v1.1 §6.5 flow-control state. `emitted` increments per
311        // countable outbound envelope; `last_ack` is updated when the
312        // client sends `session.ack`. The writer waits on `ack_notify`
313        // when the in-flight window is full.
314        let ack_window = self.inner.ack_window;
315        let emitted = Arc::new(AtomicU64::new(0));
316        let last_ack = Arc::new(AtomicU64::new(0));
317        let ack_notify = Arc::new(Notify::new());
318        let writer_emitted = Arc::clone(&emitted);
319        let writer_last_ack = Arc::clone(&last_ack);
320        let writer_ack_notify = Arc::clone(&ack_notify);
321        let writer_jobs = self.inner.jobs.clone();
322        let writer = tokio::spawn(async move {
323            while let Some(mut env) = out_rx.recv().await {
324                // Flow control (§6.5): for countable events, gate on the
325                // sliding window BEFORE persistence / publishing so an
326                // envelope blocked by backpressure isn't logged as
327                // already delivered. Non-countable envelopes (handshake,
328                // heartbeat, ack, control) bypass the gate.
329                let is_countable = env.payload.is_countable_event();
330                if is_countable {
331                    if let Some(window) = ack_window {
332                        loop {
333                            let in_flight = writer_emitted
334                                .load(Ordering::Acquire)
335                                .saturating_sub(writer_last_ack.load(Ordering::Acquire));
336                            if in_flight < window {
337                                break;
338                            }
339                            // Wait for either a new ack or for the
340                            // channel to close. run_connection drops
341                            // out_tx and then notifies us so we can
342                            // observe the closed channel and exit
343                            // instead of parking forever (§6.5).
344                            writer_ack_notify.notified().await;
345                            if out_rx.is_closed() {
346                                return;
347                            }
348                        }
349                    }
350                    // Stamp the session-scoped sequence number (§6.5 /
351                    // §6.6). For job-scoped events, also bump the
352                    // job's high-water mark so session.list_jobs and
353                    // job.subscribed report the actual last value the
354                    // subscriber can ack from.
355                    let seq = writer_emitted.fetch_add(1, Ordering::AcqRel) + 1;
356                    env.event_seq = Some(seq);
357                    if let Some(job_id) = env.job_id.as_ref() {
358                        writer_jobs.record_event_seq(job_id, seq);
359                    }
360                }
361                if let Err(e) = event_log.append(&env).await {
362                    tracing::warn!(error = %e, "failed to persist outbound envelope");
363                }
364                // Publish outbound envelopes too so subscribers see
365                // job.* / tool.* / stream.* events that originate on the
366                // server side (RFC §13). Skip subscribe.event itself so
367                // the wrapper isn't re-broadcast, which would cause an
368                // echo storm whenever a filter matches subscribe.event.
369                if !matches!(env.payload, MessageType::SubscribeEvent(_)) {
370                    let publish_env = redact_for_subscribers(&env);
371                    let _ = writer_subs.publish(&publish_env);
372                }
373                if let Err(e) = writer_transport.send(env).await {
374                    tracing::warn!(error = %e, "transport send failed; closing writer");
375                    break;
376                }
377            }
378        });
379
380        let jobs = self.inner.jobs.clone();
381        // Subscriptions owned by this connection, so we can drop them on
382        // close even if the SubscriptionManager is shared across sessions.
383        let connection_subs: Arc<DashMap<SubscriptionId, JoinHandle<()>>> =
384            Arc::new(DashMap::new());
385        // Per-connection `job.subscribe` (ARCP v1.1 §7.6) forwarders,
386        // keyed by `job_id`.
387        let connection_job_subs: Arc<DashMap<JobId, JoinHandle<()>>> = Arc::new(DashMap::new());
388        let mut state: Option<SessionState> = None;
389        let mut seen_ids: HashSet<MessageId> = HashSet::new();
390        // ARCP v1.1 durable-job semantics (§10.1, README §"Reconnect"):
391        // a normal transport drop must NOT cancel in-flight jobs. We
392        // only tear down jobs when the client sends `session.close`.
393        let mut explicit_close = false;
394
395        let result = loop {
396            let Some(envelope) = transport.recv().await? else {
397                break Ok(());
398            };
399
400            // Transport-level idempotency check.
401            if !seen_ids.insert(envelope.id.clone()) {
402                tracing::debug!(id = %envelope.id, "dropping replayed envelope");
403                continue;
404            }
405
406            // Persist incoming envelope.
407            self.inner.event_log.append(&envelope).await?;
408            // Publish to subscribers (lossy on backpressure).
409            let _ = self.inner.subscriptions.publish(&envelope);
410
411            let in_handshake = state.as_ref().is_none_or(|s| !s.is_accepted());
412            if in_handshake && !envelope.payload.is_handshake() {
413                tracing::warn!(
414                    id = %envelope.id,
415                    type_name = envelope.payload.type_name(),
416                    "dropping non-handshake message before session.accepted",
417                );
418                continue;
419            }
420
421            match envelope.payload.clone() {
422                MessageType::SessionOpen(payload) => {
423                    state = Some(
424                        self.handle_session_open(&out_tx, envelope.id.clone(), payload)
425                            .await?,
426                    );
427                }
428                MessageType::SessionAuthenticate(payload) => {
429                    if let Some(s) = state.as_mut() {
430                        self.handle_session_authenticate(
431                            &out_tx,
432                            envelope.id.clone(),
433                            s,
434                            &payload.response,
435                        )
436                        .await?;
437                    } else {
438                        tracing::warn!("session.authenticate before session.open; dropping");
439                    }
440                }
441                MessageType::SessionClose(_) => {
442                    tracing::info!("session.close received");
443                    explicit_close = true;
444                    break Ok(());
445                }
446                MessageType::ToolInvoke(payload) => {
447                    if let Some(s) = state.as_ref() {
448                        self.spawn_tool_invoke(
449                            &out_tx,
450                            &jobs,
451                            envelope.id.clone(),
452                            s.session_id.clone(),
453                            s.principal.clone(),
454                            envelope.idempotency_key.clone(),
455                            payload,
456                        )
457                        .await;
458                    }
459                }
460                MessageType::Cancel(payload) => {
461                    if let Some(s) = state.as_ref() {
462                        self.handle_cancel(&out_tx, &jobs, envelope.id.clone(), s, &payload)
463                            .await;
464                    }
465                }
466                MessageType::Ping(_) => {
467                    let mut env = Envelope::new(MessageType::Pong(
468                        arcp_core::messages::PongPayload::default(),
469                    ));
470                    env.correlation_id = Some(envelope.id.clone());
471                    if let Some(s) = state.as_ref() {
472                        env.session_id = Some(s.session_id.clone());
473                    }
474                    let _ = out_tx.send(env).await;
475                }
476                MessageType::SessionPing(payload) => {
477                    // ARCP v1.1 §6.4: echo the nonce as `ping_nonce` in
478                    // `session.pong` and stamp `received_at`.
479                    let mut env = Envelope::new(MessageType::SessionPong(
480                        arcp_core::messages::SessionPongPayload {
481                            ping_nonce: payload.nonce,
482                            received_at: chrono::Utc::now(),
483                        },
484                    ));
485                    env.correlation_id = Some(envelope.id.clone());
486                    if let Some(s) = state.as_ref() {
487                        env.session_id = Some(s.session_id.clone());
488                    }
489                    let _ = out_tx.send(env).await;
490                }
491                MessageType::SessionPong(_) => {
492                    // Heartbeat replies are observed by the client driver
493                    // (see `client::heartbeat`); the runtime treats them as
494                    // liveness evidence implicitly via transport.recv().
495                }
496                MessageType::SessionAck(payload) => {
497                    // ARCP v1.1 §6.5: monotonically advance the
498                    // last-acked counter and wake the writer.
499                    let cur = last_ack.load(Ordering::Acquire);
500                    if payload.last_processed_seq > cur {
501                        last_ack.store(payload.last_processed_seq, Ordering::Release);
502                        ack_notify.notify_waiters();
503                    }
504                }
505                MessageType::SessionListJobs(payload) => {
506                    // ARCP v1.1 §6.6: read-only job inventory scoped to
507                    // the current session's principal. The Rust SDK
508                    // scopes by session_id; cross-session listing is a
509                    // deployment-policy extension.
510                    if let Some(s) = state.as_ref() {
511                        let jobs_list =
512                            jobs.list_for_session(&s.session_id, payload.filter.as_ref());
513                        let response =
514                            MessageType::SessionJobs(arcp_core::messages::SessionJobsPayload {
515                                request_id: envelope.id.to_string(),
516                                jobs: jobs_list,
517                                next_cursor: None,
518                            });
519                        let mut env = Envelope::new(response);
520                        env.correlation_id = Some(envelope.id.clone());
521                        env.session_id = Some(s.session_id.clone());
522                        let _ = out_tx.send(env).await;
523                    }
524                }
525                MessageType::Subscribe(payload) => {
526                    if let Some(s) = state.as_ref() {
527                        Self::handle_subscribe(
528                            &out_tx,
529                            &self.inner.subscriptions,
530                            &connection_subs,
531                            envelope.id.clone(),
532                            s.session_id.clone(),
533                            payload,
534                        )
535                        .await;
536                    }
537                }
538                MessageType::Unsubscribe(UnsubscribePayload { subscription_id }) => {
539                    if let Some((_, join)) = connection_subs.remove(&subscription_id) {
540                        join.abort();
541                    }
542                    let _ = self.inner.subscriptions.unsubscribe(&subscription_id);
543                }
544                MessageType::JobSubscribe(payload) => {
545                    if let Some(s) = state.as_ref() {
546                        Self::handle_job_subscribe(
547                            &out_tx,
548                            &self.inner.subscriptions,
549                            &self.inner.jobs,
550                            &self.inner.session_principals,
551                            &connection_job_subs,
552                            envelope.id.clone(),
553                            s.session_id.clone(),
554                            s.principal.clone(),
555                            payload,
556                        )
557                        .await;
558                    }
559                }
560                MessageType::JobUnsubscribe(JobUnsubscribePayload { job_id }) => {
561                    if let Some((_, join)) = connection_job_subs.remove(&job_id) {
562                        join.abort();
563                    }
564                }
565                MessageType::ArtifactPut(payload) => {
566                    if let Some(s) = state.as_ref() {
567                        Self::handle_artifact_put(
568                            &out_tx,
569                            &self.inner.artifacts,
570                            envelope.id.clone(),
571                            s.session_id.clone(),
572                            payload,
573                        )
574                        .await;
575                    }
576                }
577                MessageType::ArtifactFetch(payload) => {
578                    if let Some(s) = state.as_ref() {
579                        Self::handle_artifact_fetch(
580                            &out_tx,
581                            &self.inner.artifacts,
582                            envelope.id.clone(),
583                            s.session_id.clone(),
584                            payload,
585                        )
586                        .await;
587                    }
588                }
589                MessageType::ArtifactRelease(ArtifactReleasePayload { artifact_id }) => {
590                    self.inner.artifacts.release(&artifact_id);
591                }
592                _ if in_handshake => {
593                    tracing::warn!(
594                        type_name = envelope.payload.type_name(),
595                        "unexpected handshake message direction",
596                    );
597                }
598                _ => {
599                    tracing::debug!(
600                        type_name = envelope.payload.type_name(),
601                        "dispatch arm not yet implemented",
602                    );
603                }
604            }
605        };
606
607        // Tear down: stop per-connection subscription forwarders and
608        // drop the out_tx so the writer drains. Per ARCP v1.1 durable
609        // semantics (§10.1), in-flight jobs survive a transport drop —
610        // they are only cancelled when the client sends `session.close`.
611        if explicit_close {
612            if let Some(s) = state.as_ref() {
613                for snap in jobs.list_for_session(&s.session_id, None) {
614                    let _ = jobs.cancel(&snap.job_id);
615                }
616            }
617        }
618        if let Some(s) = state.as_ref() {
619            self.inner.session_principals.remove(&s.session_id);
620        }
621        for entry in connection_subs.iter() {
622            entry.value().abort();
623        }
624        connection_subs.clear();
625        for entry in connection_job_subs.iter() {
626            entry.value().abort();
627        }
628        connection_job_subs.clear();
629        drop(out_tx);
630        // Wake the writer if it's currently parked on the ack window so
631        // it can observe the closed channel and exit.
632        ack_notify.notify_waiters();
633        let _ = writer.await;
634        result
635    }
636
637    async fn handle_session_open(
638        &self,
639        out: &mpsc::Sender<Envelope>,
640        correlation_id: MessageId,
641        payload: SessionOpenPayload,
642    ) -> Result<SessionState, ARCPError> {
643        let SessionOpenPayload {
644            auth,
645            client,
646            capabilities: client_caps,
647        } = payload;
648
649        let negotiated = self.negotiate_capabilities(&client_caps);
650        let session_id = SessionId::new();
651        let mut state = SessionState::new(session_id.clone(), negotiated.clone());
652
653        let Some(authenticator) = self.inner.auth.get(&auth.scheme) else {
654            self.send_rejected(
655                out,
656                correlation_id,
657                ErrorCode::Unauthenticated,
658                format!("auth scheme {:?} not configured", auth.scheme),
659            )
660            .await;
661            state.phase = HandshakePhase::Closed;
662            return Ok(state);
663        };
664
665        let outcome = authenticator
666            .authenticate(&auth, &client, &negotiated)
667            .await?;
668
669        match outcome {
670            AuthOutcome::Accept { principal } => {
671                self.inner
672                    .session_principals
673                    .insert(session_id.clone(), Some(principal.clone()));
674                state.principal = Some(principal);
675                state.phase = HandshakePhase::Accepted;
676                self.send_accepted(out, correlation_id, &session_id, &negotiated)
677                    .await;
678            }
679            AuthOutcome::Challenge { challenge } => {
680                state.active_challenge = Some(challenge.clone());
681                state.phase = HandshakePhase::Challenged;
682                let mut env = Envelope::new(MessageType::SessionChallenge(
683                    arcp_core::messages::SessionChallengePayload {
684                        challenge: challenge.clone(),
685                    },
686                ));
687                env.correlation_id = Some(correlation_id);
688                env.session_id = Some(session_id);
689                let _ = out.send(env).await;
690            }
691            AuthOutcome::Reject { reason } => {
692                self.send_rejected(out, correlation_id, ErrorCode::Unauthenticated, reason)
693                    .await;
694                state.phase = HandshakePhase::Closed;
695            }
696        }
697        Ok(state)
698    }
699
700    async fn handle_session_authenticate(
701        &self,
702        out: &mpsc::Sender<Envelope>,
703        correlation_id: MessageId,
704        state: &mut SessionState,
705        response: &str,
706    ) -> Result<(), ARCPError> {
707        let Some(challenge) = state.active_challenge.clone() else {
708            tracing::warn!("session.authenticate without active challenge; dropping");
709            return Ok(());
710        };
711        for scheme in [
712            arcp_core::messages::AuthScheme::Bearer,
713            arcp_core::messages::AuthScheme::SignedJwt,
714        ] {
715            let Some(authenticator) = self.inner.auth.get(&scheme) else {
716                continue;
717            };
718            let outcome = authenticator
719                .verify_challenge_response(&challenge, response)
720                .await?;
721            match outcome {
722                AuthOutcome::Accept { principal } => {
723                    self.inner
724                        .session_principals
725                        .insert(state.session_id.clone(), Some(principal.clone()));
726                    state.principal = Some(principal);
727                    state.phase = HandshakePhase::Accepted;
728                    state.active_challenge = None;
729                    self.send_accepted(out, correlation_id, &state.session_id, &state.capabilities)
730                        .await;
731                    return Ok(());
732                }
733                AuthOutcome::Challenge { .. } | AuthOutcome::Reject { .. } => {}
734            }
735        }
736        let mut env = Envelope::new(MessageType::SessionUnauthenticated(
737            SessionUnauthenticatedPayload {
738                code: ErrorCode::Unauthenticated,
739                message: "challenge response did not validate".into(),
740            },
741        ));
742        env.correlation_id = Some(correlation_id);
743        env.session_id = Some(state.session_id.clone());
744        let _ = out.send(env).await;
745        Ok(())
746    }
747
748    #[allow(clippy::too_many_arguments, clippy::too_many_lines)]
749    async fn spawn_tool_invoke(
750        &self,
751        out: &mpsc::Sender<Envelope>,
752        jobs: &JobRegistry,
753        correlation_id: MessageId,
754        session_id: SessionId,
755        principal: Option<String>,
756        idempotency_key: Option<IdempotencyKey>,
757        payload: ToolInvokePayload,
758    ) {
759        // ARCP v1.1 §6.4 logical idempotency: a retry of the same
760        // command intent (same scope + key + tool + arguments) MUST
761        // resolve to the original `job.accepted`. A conflicting payload
762        // under the same key is rejected with FAILED_PRECONDITION.
763        let idempotency_scope = idempotency_key.as_ref().map(|key| IdempotencyScope {
764            principal_or_session: principal.clone().unwrap_or_else(|| session_id.to_string()),
765            idempotency_key: key.clone(),
766        });
767        let canonical_args = serde_json::to_string(&payload.arguments).unwrap_or_default();
768        if let Some(scope) = idempotency_scope.as_ref() {
769            if let Some(record) = self.inner.idempotency_index.get(scope) {
770                if record.tool == payload.tool && record.arguments_canonical == canonical_args {
771                    let mut accepted =
772                        Envelope::new(MessageType::JobAccepted(record.accepted.clone()));
773                    accepted.correlation_id = Some(correlation_id);
774                    accepted.session_id = Some(session_id);
775                    accepted.job_id = Some(record.accepted.job_id.clone());
776                    accepted.idempotency_key = idempotency_key;
777                    let _ = out.send(accepted).await;
778                    return;
779                }
780                let mut err = Envelope::new(MessageType::JobFailed(JobFailedPayload {
781                    code: ErrorCode::FailedPrecondition,
782                    retryable: Some(false),
783                    message: format!(
784                        "idempotency key {} already bound to a different command intent",
785                        scope.idempotency_key
786                    ),
787                    details: None,
788                }));
789                err.correlation_id = Some(correlation_id);
790                err.session_id = Some(session_id);
791                err.idempotency_key = idempotency_key;
792                let _ = out.send(err).await;
793                return;
794            }
795        }
796        let job_id = JobId::new();
797
798        // ARCP v1.1 §7.5: parse the requested tool/agent as an
799        // AgentRef so a `name@version` reference resolves correctly.
800        let agent_ref = match arcp_core::messages::AgentRef::parse(&payload.tool) {
801            Ok(r) => r,
802            Err(e) => {
803                let mut err = Envelope::new(MessageType::JobFailed(JobFailedPayload {
804                    code: ErrorCode::InvalidArgument,
805                    retryable: Some(false),
806                    message: format!("invalid agent reference {}: {e}", payload.tool),
807                    details: None,
808                }));
809                err.correlation_id = Some(correlation_id);
810                err.session_id = Some(session_id);
811                err.job_id = Some(job_id);
812                let _ = out.send(err).await;
813                return;
814            }
815        };
816        let lease = effective_lease(&payload);
817        let defer_accepted = self.inner.credential_provisioner.is_some() && lease.is_some();
818        let accepted_sent = if defer_accepted {
819            false
820        } else {
821            let mut accepted = Envelope::new(MessageType::JobAccepted(JobAcceptedPayload {
822                job_id: job_id.clone(),
823                credentials: vec![],
824                lease: lease.clone(),
825            }));
826            accepted.correlation_id = Some(correlation_id.clone());
827            accepted.session_id = Some(session_id.clone());
828            accepted.job_id = Some(job_id.clone());
829            let _ = out.send(accepted).await;
830            true
831        };
832
833        // If a version is pinned, the advertised inventory MUST satisfy
834        // it (§7.5). Surface AGENT_VERSION_NOT_AVAILABLE on miss.
835        if agent_ref.version.is_some() {
836            let advertised = &self.inner.advertised_capabilities.agents;
837            let satisfied = advertised
838                .as_ref()
839                .is_some_and(|inv| inv.satisfies(&agent_ref));
840            if !satisfied {
841                let mut err = Envelope::new(MessageType::JobFailed(JobFailedPayload {
842                    code: ErrorCode::AgentVersionNotAvailable,
843                    retryable: Some(false),
844                    message: format!("agent version not available: {}", agent_ref.format()),
845                    details: None,
846                }));
847                err.correlation_id = Some(correlation_id);
848                err.session_id = Some(session_id);
849                err.job_id = Some(job_id);
850                let _ = out.send(err).await;
851                return;
852            }
853        }
854
855        let Some(handler) = self.inner.tools.get(&agent_ref.name) else {
856            let mut err = Envelope::new(MessageType::JobFailed(JobFailedPayload {
857                code: ErrorCode::NotFound,
858                retryable: Some(false),
859                message: format!("tool not registered: {}", agent_ref.name),
860                details: None,
861            }));
862            err.correlation_id = Some(correlation_id);
863            err.session_id = Some(session_id);
864            err.job_id = Some(job_id);
865            let _ = out.send(err).await;
866            return;
867        };
868
869        let credentials = if let (Some(provisioner), Some(lease_ref)) =
870            (&self.inner.credential_provisioner, lease.as_ref())
871        {
872            let ctx = CredentialJobContext {
873                job_id: job_id.clone(),
874                session_id: session_id.clone(),
875                principal: principal.clone(),
876                parent_job_id: None,
877            };
878            match provisioner.issue(lease_ref, &ctx).await {
879                Ok(credentials) => {
880                    self.inner
881                        .credential_ledger
882                        .record_issued(&job_id, &credentials);
883                    credentials
884                }
885                Err(e) => {
886                    let mut err = Envelope::new(MessageType::JobFailed(JobFailedPayload {
887                        code: e.code(),
888                        retryable: Some(e.retryable()),
889                        message: e.to_string(),
890                        details: None,
891                    }));
892                    err.correlation_id = Some(correlation_id);
893                    err.session_id = Some(session_id);
894                    err.job_id = Some(job_id);
895                    let _ = out.send(err).await;
896                    return;
897                }
898            }
899        } else {
900            Vec::new()
901        };
902
903        // job.accepted
904        if !accepted_sent {
905            let mut accepted = Envelope::new(MessageType::JobAccepted(JobAcceptedPayload {
906                job_id: job_id.clone(),
907                credentials: credentials.clone(),
908                lease: lease.clone(),
909            }));
910            accepted.correlation_id = Some(correlation_id.clone());
911            accepted.session_id = Some(session_id.clone());
912            accepted.job_id = Some(job_id.clone());
913            let _ = out.send(accepted).await;
914        }
915
916        // §10.1 durable jobs outlive the transport, so the job's
917        // cancel token must NOT be a child of the connection token.
918        // Authorized cancel envelopes and explicit `session.close`
919        // drive cancellation explicitly through `jobs.cancel`.
920        let cancel = CancellationToken::new();
921        let entry = JobEntry {
922            job_id: job_id.clone(),
923            session_id: session_id.clone(),
924            correlation_id: correlation_id.clone(),
925            cancel: cancel.clone(),
926            state: JobState::Accepted,
927            // §7.5: listings show the resolved `name@version` string.
928            agent: agent_ref.format(),
929            created_at: chrono::Utc::now(),
930            last_event_seq: 0,
931            parent_job_id: None,
932            credential_ids: self.inner.credential_ledger.outstanding_for_job(&job_id),
933            lease: lease.clone(),
934        };
935
936        let out_clone = out.clone();
937        let jobs_clone = jobs.clone();
938        let provisioner_clone = self.inner.credential_provisioner.clone();
939        let credential_ledger_clone = self.inner.credential_ledger.clone();
940        let cancel_for_task = cancel;
941        // ARCP v1.1 §9.6: seed the per-job budget tracker from the
942        // `cost_budget` field on `tool.invoke`. Absent / empty means
943        // budgeting is disabled for this job.
944        let budget_tracker = lease
945            .as_ref()
946            .and_then(|lease| lease.cost_budget.as_ref())
947            .map_or_else(crate::runtime::context::BudgetTracker::new, |budget| {
948                crate::runtime::context::BudgetTracker::from_budget(budget)
949            });
950
951        // Record the accepted payload against the (scope, key) tuple so
952        // a future retry resolves to this same job_id instead of
953        // spawning a duplicate (§6.4).
954        if let Some(scope) = idempotency_scope {
955            self.inner.idempotency_index.insert(
956                scope,
957                IdempotencyRecord {
958                    accepted: JobAcceptedPayload {
959                        job_id: job_id.clone(),
960                        credentials: credentials.clone(),
961                        lease: lease.clone(),
962                    },
963                    tool: agent_ref.format(),
964                    arguments_canonical: canonical_args,
965                },
966            );
967        }
968
969        let join = tokio::spawn(async move {
970            // job.started
971            let mut started = Envelope::new(MessageType::JobStarted(JobStartedPayload {
972                description: Some(format!("invoking {}", payload.tool)),
973            }));
974            started.correlation_id = Some(correlation_id.clone());
975            started.session_id = Some(session_id.clone());
976            started.job_id = Some(job_id.clone());
977            let _ = out_clone.send(started).await;
978            jobs_clone.set_state(&job_id, JobState::Running);
979
980            let ctx = ToolContext {
981                cancel: cancel_for_task.clone(),
982                job_id: job_id.clone(),
983                session_id: session_id.clone(),
984                correlation_id: correlation_id.clone(),
985                out: out_clone.clone(),
986                budget: budget_tracker,
987                lease,
988            };
989
990            let outcome = tokio::select! {
991                () = cancel_for_task.cancelled() => Outcome::Cancelled("cancellation token fired".into()),
992                result = handler.invoke(payload.arguments, ctx) => match result {
993                    Ok(value) => Outcome::Completed(value),
994                    Err(ARCPError::Cancelled { reason }) => Outcome::Cancelled(reason),
995                    Err(e) => Outcome::Failed(e),
996                },
997            };
998
999            let terminal = match outcome {
1000                Outcome::Completed(value) => {
1001                    jobs_clone.set_state(&job_id, JobState::Completed);
1002                    // Allow agents that stream results to indicate the
1003                    // terminating job.completed should reference a
1004                    // `result_id` (ARCP v1.1 §8.4) by returning the
1005                    // sentinel shape `{ "$arcp_streamed_result": {
1006                    // result_id, result_size?, summary? } }`. Everything
1007                    // else flows through as `value` (the v1.0 path).
1008                    let completed = streamed_result_from_value(value);
1009                    MessageType::JobCompleted(completed)
1010                }
1011                Outcome::Failed(e) => {
1012                    jobs_clone.set_state(&job_id, JobState::Failed);
1013                    MessageType::JobFailed(JobFailedPayload {
1014                        code: e.code(),
1015                        retryable: Some(e.retryable()),
1016                        message: e.to_string(),
1017                        details: None,
1018                    })
1019                }
1020                Outcome::Cancelled(reason) => {
1021                    jobs_clone.set_state(&job_id, JobState::Cancelled);
1022                    MessageType::JobCancelled(JobCancelledPayload {
1023                        reason: Some(reason),
1024                    })
1025                }
1026            };
1027            let mut term = Envelope::new(terminal);
1028            term.correlation_id = Some(correlation_id);
1029            term.session_id = Some(session_id);
1030            term.job_id = Some(job_id.clone());
1031            let _ = out_clone.send(term).await;
1032            if let Some(provisioner) = provisioner_clone.as_ref() {
1033                if let Err(e) =
1034                    revoke_all_for_job(&credential_ledger_clone, provisioner, &job_id).await
1035                {
1036                    tracing::warn!(error = %e, job_id = %job_id, "failed to revoke credentials");
1037                }
1038            }
1039        });
1040
1041        jobs.insert(entry, join);
1042    }
1043
1044    async fn handle_cancel(
1045        &self,
1046        out: &mpsc::Sender<Envelope>,
1047        jobs: &JobRegistry,
1048        correlation_id: MessageId,
1049        requester: &SessionState,
1050        payload: &CancelPayload,
1051    ) {
1052        let CancelPayload {
1053            target, target_id, ..
1054        } = payload;
1055        match target {
1056            CancelTargetKind::Job => {
1057                #[allow(clippy::option_if_let_else)] // map_or_else nests too deeply here
1058                let response_payload = if let Ok(job_id) = target_id.parse::<JobId>() {
1059                    if let Some(snap) = jobs.snapshot(&job_id) {
1060                        // ARCP v1.1 §7.6 / §10: cancel authority is
1061                        // bound to the owning session or the same
1062                        // authenticated principal. A subscriber that
1063                        // merely knows another session's job id MUST
1064                        // NOT be able to cancel it.
1065                        let authorized = snap.session_id == requester.session_id
1066                            || cancel_principal_matches(
1067                                &self.inner.session_principals,
1068                                &snap.session_id,
1069                                requester.principal.as_deref(),
1070                            );
1071                        if authorized {
1072                            if jobs.cancel(&job_id) {
1073                                MessageType::CancelAccepted(
1074                                    arcp_core::messages::CancelAcceptedPayload {
1075                                        target_id: Some(target_id.clone()),
1076                                    },
1077                                )
1078                            } else {
1079                                MessageType::CancelRefused(
1080                                    arcp_core::messages::CancelRefusedPayload {
1081                                        target_id: target_id.clone(),
1082                                        reason: "job is no longer in-flight".into(),
1083                                    },
1084                                )
1085                            }
1086                        } else {
1087                            MessageType::CancelRefused(arcp_core::messages::CancelRefusedPayload {
1088                                target_id: target_id.clone(),
1089                                reason: "permission denied: not authorized to cancel this job"
1090                                    .into(),
1091                            })
1092                        }
1093                    } else {
1094                        MessageType::CancelRefused(arcp_core::messages::CancelRefusedPayload {
1095                            target_id: target_id.clone(),
1096                            reason: "no such in-flight job".into(),
1097                        })
1098                    }
1099                } else {
1100                    MessageType::CancelRefused(arcp_core::messages::CancelRefusedPayload {
1101                        target_id: target_id.clone(),
1102                        reason: "malformed job id".into(),
1103                    })
1104                };
1105                let mut env = Envelope::new(response_payload);
1106                env.correlation_id = Some(correlation_id);
1107                env.session_id = Some(requester.session_id.clone());
1108                let _ = out.send(env).await;
1109            }
1110            CancelTargetKind::Stream | CancelTargetKind::Session => {
1111                tracing::warn!(?target, "cancel target not yet implemented");
1112            }
1113        }
1114    }
1115
1116    fn negotiate_capabilities(&self, client_caps: &Capabilities) -> Capabilities {
1117        // Intersection: a capability is enabled only if both sides set it.
1118        let runtime_caps = &self.inner.advertised_capabilities;
1119        Capabilities {
1120            streaming: intersect_bool(runtime_caps.streaming, client_caps.streaming),
1121            durable_jobs: intersect_bool(runtime_caps.durable_jobs, client_caps.durable_jobs),
1122            checkpoints: intersect_bool(runtime_caps.checkpoints, client_caps.checkpoints),
1123            binary_streams: intersect_bool(runtime_caps.binary_streams, client_caps.binary_streams),
1124            agent_handoff: intersect_bool(runtime_caps.agent_handoff, client_caps.agent_handoff),
1125            model_use: intersect_bool(runtime_caps.model_use, client_caps.model_use),
1126            provisioned_credentials: intersect_bool(
1127                runtime_caps.provisioned_credentials,
1128                client_caps.provisioned_credentials,
1129            ),
1130            artifacts: intersect_bool(runtime_caps.artifacts, client_caps.artifacts),
1131            subscriptions: intersect_bool(runtime_caps.subscriptions, client_caps.subscriptions),
1132            scheduled_jobs: intersect_bool(runtime_caps.scheduled_jobs, client_caps.scheduled_jobs),
1133            interrupt: intersect_bool(runtime_caps.interrupt, client_caps.interrupt),
1134            anonymous: intersect_bool(runtime_caps.anonymous, client_caps.anonymous),
1135            heartbeat_recovery: runtime_caps.heartbeat_recovery.clone(),
1136            binary_encoding: runtime_caps.binary_encoding.clone(),
1137            extensions: runtime_caps
1138                .extensions
1139                .iter()
1140                .filter(|e| client_caps.extensions.contains(e))
1141                .cloned()
1142                .collect(),
1143            artifact_retention: runtime_caps.artifact_retention.clone(),
1144            // ARCP v1.1 §7.5 — pass the runtime's agent inventory
1145            // through to the negotiated capability block. Clients
1146            // typically do not advertise agents, so this is a
1147            // server-side pass-through.
1148            agents: runtime_caps.agents.clone(),
1149            extra: std::collections::BTreeMap::new(),
1150        }
1151    }
1152
1153    async fn send_accepted(
1154        &self,
1155        out: &mpsc::Sender<Envelope>,
1156        correlation_id: MessageId,
1157        session_id: &SessionId,
1158        capabilities: &Capabilities,
1159    ) {
1160        let lease = self.inner.session_lease_seconds.map(|s| SessionLease {
1161            expires_at: chrono::Utc::now()
1162                + chrono::Duration::seconds(i64::try_from(s).unwrap_or(i64::MAX)),
1163        });
1164        let mut env = Envelope::new(MessageType::SessionAccepted(SessionAcceptedPayload {
1165            session_id: session_id.clone(),
1166            runtime: self.inner.runtime_identity.clone(),
1167            capabilities: capabilities.clone(),
1168            lease,
1169        }));
1170        env.correlation_id = Some(correlation_id);
1171        env.session_id = Some(session_id.clone());
1172        let _ = out.send(env).await;
1173    }
1174
1175    async fn send_rejected(
1176        &self,
1177        out: &mpsc::Sender<Envelope>,
1178        correlation_id: MessageId,
1179        code: ErrorCode,
1180        message: String,
1181    ) {
1182        let mut env = Envelope::new(MessageType::SessionRejected(SessionRejectedPayload {
1183            code,
1184            message,
1185        }));
1186        env.correlation_id = Some(correlation_id);
1187        let _ = out.send(env).await;
1188    }
1189
1190    async fn handle_subscribe(
1191        out: &mpsc::Sender<Envelope>,
1192        manager: &SubscriptionManager,
1193        connection_subs: &Arc<DashMap<SubscriptionId, JoinHandle<()>>>,
1194        correlation_id: MessageId,
1195        session_id: SessionId,
1196        payload: SubscribePayload,
1197    ) {
1198        let SubscribePayload { filter, since: _ } = payload;
1199        // PLAN.md §A4.10 reserves richer authorisation; for v0.1 we accept
1200        // any filter from an authenticated session.
1201        let (subscription_id, mut rx) = manager.register(filter, session_id.clone());
1202        // Acknowledge the subscription.
1203        let mut accepted =
1204            Envelope::new(MessageType::SubscribeAccepted(SubscribeAcceptedPayload {
1205                subscription_id: subscription_id.clone(),
1206            }));
1207        accepted.correlation_id = Some(correlation_id);
1208        accepted.session_id = Some(session_id);
1209        accepted.subscription_id = Some(subscription_id.clone());
1210        let _ = out.send(accepted).await;
1211
1212        // Spawn a forwarder task that wraps each delivered envelope in a
1213        // subscribe.event and pushes to the outbound channel. Backfill
1214        // (the §13.3 boundary marker) is left for a follow-up.
1215        let out_clone = out.clone();
1216        let sub_id = subscription_id.clone();
1217        let join = tokio::spawn(async move {
1218            while let Some(event) = rx.next().await {
1219                let value = match serde_json::to_value(&event) {
1220                    Ok(v) => v,
1221                    Err(e) => {
1222                        tracing::warn!(error = %e, "subscribe.event serialise failed");
1223                        continue;
1224                    }
1225                };
1226                let mut wrapper =
1227                    Envelope::new(MessageType::SubscribeEvent(SubscribeEventPayload {
1228                        event: value,
1229                    }));
1230                wrapper.subscription_id = Some(sub_id.clone());
1231                if out_clone.send(wrapper).await.is_err() {
1232                    break;
1233                }
1234            }
1235        });
1236        connection_subs.insert(subscription_id, join);
1237    }
1238
1239    #[allow(clippy::too_many_arguments)]
1240    async fn handle_job_subscribe(
1241        out: &mpsc::Sender<Envelope>,
1242        manager: &SubscriptionManager,
1243        jobs: &JobRegistry,
1244        session_principals: &DashMap<SessionId, Option<String>>,
1245        connection_job_subs: &Arc<DashMap<JobId, JoinHandle<()>>>,
1246        correlation_id: MessageId,
1247        subscriber_session: SessionId,
1248        subscriber_principal: Option<String>,
1249        payload: JobSubscribePayload,
1250    ) {
1251        let JobSubscribePayload {
1252            job_id,
1253            from_event_seq: _,
1254            history: _,
1255        } = payload;
1256
1257        let Some(snap) = jobs.snapshot(&job_id) else {
1258            let mut err = Envelope::new(MessageType::Nack(NackPayload {
1259                code: ErrorCode::NotFound,
1260                message: format!("no such job: {job_id}"),
1261                details: None,
1262            }));
1263            err.correlation_id = Some(correlation_id);
1264            err.session_id = Some(subscriber_session);
1265            let _ = out.send(err).await;
1266            return;
1267        };
1268
1269        // Authorization (§7.6): subscribing session's principal must
1270        // match the submitter's principal. The submitter is always
1271        // permitted (same session_id).
1272        if snap.session_id != subscriber_session {
1273            let submitter_principal = session_principals
1274                .get(&snap.session_id)
1275                .and_then(|p| p.value().clone());
1276            let permitted = match (&submitter_principal, &subscriber_principal) {
1277                (Some(a), Some(b)) => a == b,
1278                _ => false,
1279            };
1280            if !permitted {
1281                let mut err = Envelope::new(MessageType::Nack(NackPayload {
1282                    code: ErrorCode::PermissionDenied,
1283                    message: "principal not authorized to subscribe to this job".into(),
1284                    details: None,
1285                }));
1286                err.correlation_id = Some(correlation_id);
1287                err.session_id = Some(subscriber_session);
1288                err.job_id = Some(job_id);
1289                let _ = out.send(err).await;
1290                return;
1291            }
1292        }
1293
1294        // Build a filter that selects only this job's envelopes.
1295        let filter = arcp_core::messages::SubscriptionFilter {
1296            job_id: vec![job_id.clone()],
1297            ..arcp_core::messages::SubscriptionFilter::default()
1298        };
1299        let (_internal_id, mut rx) = manager.register(filter, subscriber_session.clone());
1300
1301        // Acknowledge.
1302        let ack = JobSubscribedPayload {
1303            job_id: job_id.clone(),
1304            current_status: snap.state.wire_str().to_owned(),
1305            agent: snap.agent.clone(),
1306            parent_job_id: snap.parent_job_id.clone(),
1307            trace_id: None,
1308            subscribed_from: snap.last_event_seq,
1309            // History replay is not yet implemented in this SDK; the ack
1310            // always carries `replayed: false`, matching live-only
1311            // semantics (§7.6 permits `history: false`).
1312            replayed: false,
1313        };
1314        let mut ack_env = Envelope::new(MessageType::JobSubscribed(ack));
1315        ack_env.correlation_id = Some(correlation_id);
1316        ack_env.session_id = Some(subscriber_session.clone());
1317        ack_env.job_id = Some(job_id.clone());
1318        let _ = out.send(ack_env).await;
1319
1320        // Spawn forwarder: rewrites session_id to the subscriber's so
1321        // client-side parsers route correctly. The originating session's
1322        // own writer is responsible for the submitter's copy; here we
1323        // only fan out a clone to the subscriber.
1324        let out_clone = out.clone();
1325        let subscriber_session_clone = subscriber_session;
1326        let job_id_clone = job_id.clone();
1327        let connection_job_subs_clone = Arc::clone(connection_job_subs);
1328        let join = tokio::spawn(async move {
1329            while let Some(mut env) = rx.next().await {
1330                // Only forward server-originated, job-scoped envelopes.
1331                // Skip subscriber's own client-to-server messages (e.g.
1332                // tool.invoke, cancel) which can appear on the bus.
1333                if !is_forwardable_job_event(&env.payload) {
1334                    continue;
1335                }
1336                env.session_id = Some(subscriber_session_clone.clone());
1337                if out_clone.send(env).await.is_err() {
1338                    break;
1339                }
1340            }
1341            // Forwarder exited (job terminal or unsubscribe).
1342            connection_job_subs_clone.remove(&job_id_clone);
1343        });
1344        connection_job_subs.insert(job_id, join);
1345    }
1346
1347    async fn handle_artifact_put(
1348        out: &mpsc::Sender<Envelope>,
1349        store: &ArtifactStore,
1350        correlation_id: MessageId,
1351        session_id: SessionId,
1352        payload: ArtifactPutPayload,
1353    ) {
1354        let ArtifactPutPayload {
1355            media_type,
1356            data,
1357            sha256,
1358            retain_seconds,
1359        } = payload;
1360        let mut env = match store.put(media_type, &data, retain_seconds, sha256) {
1361            Ok(reference) => Envelope::new(MessageType::ArtifactRef(ArtifactRefPayload {
1362                artifact: reference,
1363            })),
1364            Err(e) => Envelope::new(MessageType::Nack(NackPayload {
1365                code: e.code(),
1366                message: e.to_string(),
1367                details: None,
1368            })),
1369        };
1370        env.correlation_id = Some(correlation_id);
1371        env.session_id = Some(session_id);
1372        let _ = out.send(env).await;
1373    }
1374
1375    async fn handle_artifact_fetch(
1376        out: &mpsc::Sender<Envelope>,
1377        store: &ArtifactStore,
1378        correlation_id: MessageId,
1379        session_id: SessionId,
1380        payload: ArtifactFetchPayload,
1381    ) {
1382        let ArtifactFetchPayload { artifact_id } = payload;
1383        let mut env = match store.fetch(&artifact_id) {
1384            Ok((data, media_type)) => Envelope::new(MessageType::ArtifactPut(ArtifactPutPayload {
1385                media_type,
1386                data,
1387                sha256: None,
1388                retain_seconds: None,
1389            })),
1390            Err(e) => Envelope::new(MessageType::Nack(NackPayload {
1391                code: e.code(),
1392                message: e.to_string(),
1393                details: None,
1394            })),
1395        };
1396        env.correlation_id = Some(correlation_id);
1397        env.session_id = Some(session_id);
1398        let _ = out.send(env).await;
1399    }
1400}
1401
1402enum Outcome {
1403    Completed(serde_json::Value),
1404    Failed(ARCPError),
1405    Cancelled(String),
1406}
1407
1408fn effective_lease(payload: &ToolInvokePayload) -> Option<LeaseRequest> {
1409    if let Some(lease) = payload.lease_request.clone() {
1410        return Some(lease);
1411    }
1412    payload.cost_budget.clone().map(|cost_budget| LeaseRequest {
1413        cost_budget: Some(cost_budget),
1414        model_use: None,
1415        expires_at: None,
1416        extra: std::collections::BTreeMap::new(),
1417    })
1418}
1419
1420fn redact_for_subscribers(env: &Envelope) -> Envelope {
1421    let mut out = env.clone();
1422    if let MessageType::JobAccepted(payload) = &mut out.payload {
1423        payload.credentials.clear();
1424    }
1425    out
1426}
1427
1428/// Sentinel key for streamed-result agents (ARCP v1.1 §8.4).
1429///
1430/// When an agent's returned value is a single-entry object keyed by this
1431/// constant, the runtime promotes the payload (`result_id`,
1432/// `result_size`, `summary`) onto the terminating `job.completed` rather
1433/// than carrying the sentinel through as `value`.
1434pub const STREAMED_RESULT_SENTINEL: &str = "$arcp_streamed_result";
1435
1436/// Build a [`JobCompletedPayload`] from a tool's returned value,
1437/// recognising the streaming-result sentinel.
1438fn streamed_result_from_value(value: serde_json::Value) -> JobCompletedPayload {
1439    if let Some(obj) = value.as_object() {
1440        if obj.len() == 1 {
1441            if let Some(inner) = obj.get(STREAMED_RESULT_SENTINEL) {
1442                let result_id = inner
1443                    .get("result_id")
1444                    .and_then(|v| v.as_str())
1445                    .map(String::from);
1446                let result_size = inner.get("result_size").and_then(serde_json::Value::as_u64);
1447                let summary = inner
1448                    .get("summary")
1449                    .and_then(|v| v.as_str())
1450                    .map(String::from);
1451                if result_id.is_some() {
1452                    return JobCompletedPayload {
1453                        value: None,
1454                        result_ref: None,
1455                        result_id,
1456                        result_size,
1457                        summary,
1458                    };
1459                }
1460            }
1461        }
1462    }
1463    JobCompletedPayload {
1464        value: Some(value),
1465        result_ref: None,
1466        result_id: None,
1467        result_size: None,
1468        summary: None,
1469    }
1470}
1471
1472/// True when an envelope is a server-emitted job event suitable for
1473/// `job.subscribe` forwarding (ARCP v1.1 §7.6).
1474///
1475/// Filters out client-to-server commands that happen to carry `job_id`
1476/// (e.g. `cancel`, `tool.invoke`).
1477const fn is_forwardable_job_event(payload: &MessageType) -> bool {
1478    matches!(
1479        payload,
1480        MessageType::JobAccepted(_)
1481            | MessageType::JobStarted(_)
1482            | MessageType::JobProgress(_)
1483            | MessageType::JobHeartbeat(_)
1484            | MessageType::JobCompleted(_)
1485            | MessageType::JobFailed(_)
1486            | MessageType::JobCancelled(_)
1487            | MessageType::JobResultChunk(_)
1488            | MessageType::ToolResult(_)
1489            | MessageType::ToolError(_)
1490            | MessageType::Log(_)
1491            | MessageType::Metric(_)
1492            | MessageType::StreamOpen(_)
1493            | MessageType::StreamChunk(_)
1494            | MessageType::StreamClose(_)
1495            | MessageType::StreamError(_)
1496            | MessageType::ArtifactRef(_)
1497    )
1498}
1499
1500/// Same-principal authorization helper for cross-session cancel
1501/// (ARCP v1.1 §10). Returns `true` only when the requesting session's
1502/// principal is non-anonymous and matches the principal that originally
1503/// submitted the job.
1504fn cancel_principal_matches(
1505    session_principals: &DashMap<SessionId, Option<String>>,
1506    owning_session: &SessionId,
1507    requester_principal: Option<&str>,
1508) -> bool {
1509    let Some(requester_principal) = requester_principal else {
1510        return false;
1511    };
1512    session_principals
1513        .get(owning_session)
1514        .and_then(|p| p.value().clone())
1515        .is_some_and(|owner| owner == requester_principal)
1516}
1517
1518/// Intersect two boolean capability slots.
1519///
1520/// Returns `None` only when neither side advertised the capability — in
1521/// that case the field is elided on the wire, matching RFC §7's "absent =
1522/// false" semantics. When at least one side advertised, the result is
1523/// `Some(both_set)`.
1524const fn intersect_bool(a: Option<bool>, b: Option<bool>) -> Option<bool> {
1525    match (a, b) {
1526        (Some(true), Some(true)) => Some(true),
1527        (Some(_), _) | (_, Some(_)) => Some(false),
1528        (None, None) => None,
1529    }
1530}