Skip to main content

chio_kernel/kernel/
session_ops.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2
3use super::*;
4
5static GLOBAL_SESSION_COUNTER: AtomicU64 = AtomicU64::new(0);
6
7fn bump_session_counter_from_id(session_id: &SessionId) {
8    let Some(number) = session_id
9        .as_str()
10        .strip_prefix("sess-")
11        .and_then(|value| value.parse::<u64>().ok())
12    else {
13        return;
14    };
15
16    let mut current = GLOBAL_SESSION_COUNTER.load(Ordering::SeqCst);
17    while current < number {
18        match GLOBAL_SESSION_COUNTER.compare_exchange(
19            current,
20            number,
21            Ordering::SeqCst,
22            Ordering::SeqCst,
23        ) {
24            Ok(_) => break,
25            Err(observed) => current = observed,
26        }
27    }
28}
29
30impl ChioKernel {
31    pub fn open_session(
32        &self,
33        agent_id: AgentId,
34        issued_capabilities: Vec<CapabilityToken>,
35    ) -> SessionId {
36        let session_number = GLOBAL_SESSION_COUNTER.fetch_add(1, Ordering::SeqCst) + 1;
37        let session_id = SessionId::new(format!("sess-{}", session_number));
38
39        self.open_session_with_id(session_id, agent_id, issued_capabilities)
40            .unwrap_or_else(|error| panic!("failed to open session: {error}"))
41    }
42
43    pub fn open_session_with_id(
44        &self,
45        session_id: SessionId,
46        agent_id: AgentId,
47        issued_capabilities: Vec<CapabilityToken>,
48    ) -> Result<SessionId, KernelError> {
49        bump_session_counter_from_id(&session_id);
50
51        info!(session_id = %session_id, agent_id = %agent_id, "opening session");
52        let session_snapshot = self.with_sessions_write(|sessions| {
53            if sessions.contains_key(&session_id) {
54                return Err(KernelError::SessionAlreadyExists(session_id.clone()));
55            }
56            let session = Session::new(session_id.clone(), agent_id, issued_capabilities);
57            let snapshot = session.clone();
58            sessions.insert(session_id.clone(), session);
59            Ok(snapshot)
60        })?;
61        if let Err(error) = self.persist_session_anchor_snapshot(&session_snapshot, None) {
62            self.with_sessions_write(|sessions| {
63                sessions.remove(&session_id);
64                Ok(())
65            })?;
66            return Err(error);
67        }
68
69        Ok(session_id)
70    }
71
72    /// Transition a session into the `ready` state once setup is complete.
73    pub fn activate_session(&self, session_id: &SessionId) -> Result<(), KernelError> {
74        self.validate_web3_evidence_prerequisites()?;
75        self.with_session_mut(session_id, |session| {
76            session.activate()?;
77            Ok(())
78        })
79    }
80
81    /// Persist transport/session authentication context for a session.
82    pub fn set_session_auth_context(
83        &self,
84        session_id: &SessionId,
85        auth_context: SessionAuthContext,
86    ) -> Result<(), KernelError> {
87        let (session_snapshot, supersedes_anchor_id) =
88            self.with_session_mut(session_id, |session| {
89                let previous_anchor_id = session.session_anchor().id().to_string();
90                session.set_auth_context(auth_context);
91                let supersedes_anchor_id = (session.session_anchor().id() != previous_anchor_id)
92                    .then_some(previous_anchor_id);
93                Ok((session.clone(), supersedes_anchor_id))
94            })?;
95        self.persist_session_anchor_snapshot(&session_snapshot, supersedes_anchor_id.as_deref())
96    }
97
98    /// Persist peer capabilities negotiated at the edge for a session.
99    pub fn set_session_peer_capabilities(
100        &self,
101        session_id: &SessionId,
102        peer_capabilities: PeerCapabilities,
103    ) -> Result<(), KernelError> {
104        self.with_session_mut(session_id, |session| {
105            session.set_peer_capabilities(peer_capabilities);
106            Ok(())
107        })
108    }
109
110    /// Replace the session's current root snapshot.
111    pub fn replace_session_roots(
112        &self,
113        session_id: &SessionId,
114        roots: Vec<RootDefinition>,
115    ) -> Result<(), KernelError> {
116        self.with_session_mut(session_id, |session| {
117            session.replace_roots(roots);
118            Ok(())
119        })
120    }
121
122    /// Return the runtime's normalized root view for a session.
123    pub fn normalized_session_roots(
124        &self,
125        session_id: &SessionId,
126    ) -> Result<Vec<NormalizedRoot>, KernelError> {
127        self.with_session(session_id, |session| {
128            Ok(session.normalized_roots().to_vec())
129        })
130    }
131
132    /// Return only the enforceable filesystem root paths for a session.
133    pub fn enforceable_filesystem_root_paths(
134        &self,
135        session_id: &SessionId,
136    ) -> Result<Vec<String>, KernelError> {
137        self.with_session(session_id, |session| {
138            Ok(session
139                .enforceable_filesystem_roots()
140                .filter_map(NormalizedRoot::normalized_filesystem_path)
141                .map(str::to_string)
142                .collect())
143        })
144    }
145
146    pub(crate) fn session_enforceable_filesystem_root_paths_owned(
147        &self,
148        session_id: &SessionId,
149    ) -> Result<Vec<String>, KernelError> {
150        self.with_session(session_id, |session| {
151            Ok(session
152                .enforceable_filesystem_roots()
153                .filter_map(NormalizedRoot::normalized_filesystem_path)
154                .map(str::to_string)
155                .collect())
156        })
157    }
158
159    pub(crate) fn resource_path_within_root(candidate: &str, root: &str) -> bool {
160        if candidate == root {
161            return true;
162        }
163
164        if root == "/" {
165            return candidate.starts_with('/');
166        }
167
168        candidate
169            .strip_prefix(root)
170            .map(|suffix| suffix.starts_with('/'))
171            .unwrap_or(false)
172    }
173
174    pub(crate) fn resource_path_matches_session_roots(
175        path: &str,
176        session_roots: &[String],
177    ) -> bool {
178        if session_roots.is_empty() {
179            return false;
180        }
181
182        session_roots
183            .iter()
184            .any(|root| Self::resource_path_within_root(path, root))
185    }
186
187    pub(crate) fn enforce_resource_roots(
188        &self,
189        context: &OperationContext,
190        operation: &ReadResourceOperation,
191    ) -> Result<(), KernelError> {
192        match operation.classify_uri_for_runtime() {
193            ResourceUriClassification::NonFileSystem { .. } => Ok(()),
194            ResourceUriClassification::EnforceableFileSystem {
195                normalized_path, ..
196            } => {
197                let session_roots =
198                    self.session_enforceable_filesystem_root_paths_owned(&context.session_id)?;
199
200                if Self::resource_path_matches_session_roots(&normalized_path, &session_roots) {
201                    Ok(())
202                } else {
203                    let reason = if session_roots.is_empty() {
204                        "no enforceable filesystem roots are available for this session".to_string()
205                    } else {
206                        format!(
207                            "filesystem-backed resource path {normalized_path} is outside the negotiated roots"
208                        )
209                    };
210
211                    Err(KernelError::ResourceRootDenied {
212                        uri: operation.uri.clone(),
213                        reason,
214                    })
215                }
216            }
217            ResourceUriClassification::UnenforceableFileSystem { reason, .. } => {
218                Err(KernelError::ResourceRootDenied {
219                    uri: operation.uri.clone(),
220                    reason: format!(
221                        "filesystem-backed resource URI could not be enforced: {reason}"
222                    ),
223                })
224            }
225        }
226    }
227
228    pub(crate) fn build_resource_read_deny_receipt(
229        &self,
230        operation: &ReadResourceOperation,
231        reason: &str,
232    ) -> Result<ChioReceipt, KernelError> {
233        let receipt_content = receipt_content_for_output(None, None)?;
234        let action = ToolCallAction::from_parameters(serde_json::json!({
235            "uri": &operation.uri,
236        }))
237        .map_err(|error| {
238            KernelError::ReceiptSigningFailed(format!(
239                "failed to hash resource read parameters: {error}"
240            ))
241        })?;
242
243        let receipt = self.build_and_sign_receipt(ReceiptParams {
244            capability_id: &operation.capability.id,
245            tool_name: "resources/read",
246            server_id: "session",
247            decision: Decision::Deny {
248                reason: reason.to_string(),
249                guard: "session_roots".to_string(),
250            },
251            action,
252            content_hash: receipt_content.content_hash,
253            metadata: merge_metadata_objects(
254                Some(serde_json::json!({
255                    "resource": {
256                        "uri": &operation.uri,
257                    }
258                })),
259                receipt_attribution_metadata(&operation.capability, None),
260            ),
261            timestamp: current_unix_timestamp(),
262            trust_level: chio_core::TrustLevel::default(),
263            tenant_id: None,
264        })?;
265
266        self.record_chio_receipt(&receipt)?;
267        Ok(receipt)
268    }
269
270    /// Subscribe the session to update notifications for a concrete resource URI.
271    pub fn subscribe_session_resource(
272        &self,
273        session_id: &SessionId,
274        capability: &CapabilityToken,
275        agent_id: &str,
276        uri: &str,
277    ) -> Result<(), KernelError> {
278        self.validate_non_tool_capability(capability, agent_id)?;
279
280        if !capability_matches_resource_subscription(capability, uri)? {
281            return Err(KernelError::OutOfScopeResource {
282                uri: uri.to_string(),
283            });
284        }
285
286        if !self.resource_exists(uri)? {
287            return Err(KernelError::ResourceNotRegistered(uri.to_string()));
288        }
289
290        self.with_session_mut(session_id, |session| {
291            session.subscribe_resource(uri.to_string());
292            Ok(())
293        })
294    }
295
296    /// Remove a session-scoped resource subscription. Missing subscriptions are ignored.
297    pub fn unsubscribe_session_resource(
298        &self,
299        session_id: &SessionId,
300        uri: &str,
301    ) -> Result<(), KernelError> {
302        self.with_session_mut(session_id, |session| {
303            session.unsubscribe_resource(uri);
304            Ok(())
305        })
306    }
307
308    /// Check whether a session currently holds a resource subscription.
309    pub fn session_has_resource_subscription(
310        &self,
311        session_id: &SessionId,
312        uri: &str,
313    ) -> Result<bool, KernelError> {
314        self.with_session(
315            session_id,
316            |session| Ok(session.is_resource_subscribed(uri)),
317        )
318    }
319
320    /// Mark a session as draining. New tool calls are rejected after this point.
321    pub fn begin_draining_session(&self, session_id: &SessionId) -> Result<(), KernelError> {
322        self.with_session_mut(session_id, |session| {
323            session.begin_draining()?;
324            Ok(())
325        })
326    }
327
328    /// Close a session and clear transient session-scoped state.
329    pub fn close_session(&self, session_id: &SessionId) -> Result<(), KernelError> {
330        self.with_session_mut(session_id, |session| {
331            session.close()?;
332            Ok(())
333        })
334    }
335
336    /// Inspect an existing session.
337    pub fn session(&self, session_id: &SessionId) -> Option<Session> {
338        self.with_sessions_read(|sessions| Ok(sessions.get(session_id).cloned()))
339            .ok()
340            .flatten()
341    }
342
343    pub fn session_count(&self) -> usize {
344        self.with_sessions_read(|sessions| Ok(sessions.len()))
345            .unwrap_or(0)
346    }
347
348    pub fn resource_provider_count(&self) -> usize {
349        self.resource_providers.len()
350    }
351
352    pub fn prompt_provider_count(&self) -> usize {
353        self.prompt_providers.len()
354    }
355
356    /// Validate a session-scoped operation and register it as in flight.
357    pub fn begin_session_request(
358        &self,
359        context: &OperationContext,
360        operation_kind: OperationKind,
361        cancellable: bool,
362    ) -> Result<(), KernelError> {
363        self.with_sessions_write(|sessions| {
364            begin_session_request_in_sessions(sessions, context, operation_kind, cancellable)
365        })?;
366        let session_snapshot = self
367            .session(&context.session_id)
368            .ok_or_else(|| KernelError::UnknownSession(context.session_id.clone()))?;
369        self.persist_request_lineage_snapshot(&session_snapshot, &context.request_id)
370    }
371
372    /// Construct and register a child request under an existing parent request.
373    pub fn begin_child_request(
374        &self,
375        parent_context: &OperationContext,
376        request_id: RequestId,
377        operation_kind: OperationKind,
378        progress_token: Option<ProgressToken>,
379        cancellable: bool,
380    ) -> Result<OperationContext, KernelError> {
381        let child_context = self.with_sessions_write(|sessions| {
382            begin_child_request_in_sessions(
383                sessions,
384                parent_context,
385                request_id,
386                operation_kind,
387                progress_token,
388                cancellable,
389            )
390        })?;
391        let session_snapshot = self
392            .session(&child_context.session_id)
393            .ok_or_else(|| KernelError::UnknownSession(child_context.session_id.clone()))?;
394        self.persist_request_lineage_snapshot(&session_snapshot, &child_context.request_id)?;
395        Ok(child_context)
396    }
397
398    /// Complete an in-flight session request.
399    pub fn complete_session_request(
400        &self,
401        session_id: &SessionId,
402        request_id: &RequestId,
403    ) -> Result<(), KernelError> {
404        self.complete_session_request_with_terminal_state(
405            session_id,
406            request_id,
407            OperationTerminalState::Completed,
408        )
409    }
410
411    /// Complete an in-flight session request with an explicit terminal state.
412    pub fn complete_session_request_with_terminal_state(
413        &self,
414        session_id: &SessionId,
415        request_id: &RequestId,
416        terminal_state: OperationTerminalState,
417    ) -> Result<(), KernelError> {
418        self.with_sessions_write(|sessions| {
419            complete_session_request_with_terminal_state_in_sessions(
420                sessions,
421                session_id,
422                request_id,
423                terminal_state,
424            )
425        })
426    }
427
428    pub(crate) fn signed_session_anchor_for_session(
429        &self,
430        session: &Session,
431    ) -> Result<chio_core::session::SessionAnchor, KernelError> {
432        let body = chio_core::session::SessionAnchorBody::new(
433            session.session_anchor().id().to_string(),
434            chio_core::session::SessionAnchorContext::new(
435                session.id().clone(),
436                session.agent_id().to_string(),
437                session.auth_context().clone(),
438                chio_core::session::SessionProofBinding::from_auth_context(session.auth_context()),
439            ),
440            session.session_anchor().auth_epoch(),
441            session.session_anchor().issued_at(),
442            self.config.keypair.public_key(),
443        )
444        .map_err(|error| {
445            KernelError::Internal(format!("failed to build session anchor body: {error}"))
446        })?;
447
448        chio_core::session::SessionAnchor::sign(body, &self.config.keypair).map_err(|error| {
449            KernelError::Internal(format!("failed to sign session anchor: {error}"))
450        })
451    }
452
453    fn persist_session_anchor_snapshot(
454        &self,
455        session: &Session,
456        supersedes_anchor_id: Option<&str>,
457    ) -> Result<(), KernelError> {
458        let anchor = self.signed_session_anchor_for_session(session)?;
459        let anchor_json = serde_json::to_value(&anchor).map_err(|error| {
460            KernelError::Internal(format!("failed to serialize session anchor: {error}"))
461        })?;
462        self.with_receipt_store(|store| {
463            Ok(store.record_session_anchor(
464                session.id().as_str(),
465                &anchor.id,
466                &anchor.auth_context_hash,
467                anchor.issued_at,
468                supersedes_anchor_id,
469                &anchor_json,
470            )?)
471        })?;
472        Ok(())
473    }
474
475    fn persist_request_lineage_snapshot(
476        &self,
477        session: &Session,
478        request_id: &RequestId,
479    ) -> Result<(), KernelError> {
480        let Some(local_lineage) = session.request_lineage(request_id) else {
481            return Ok(());
482        };
483
484        let anchor = self.signed_session_anchor_for_session(session)?;
485        let anchor_reference = anchor.reference().map_err(|error| {
486            KernelError::Internal(format!(
487                "failed to derive session anchor reference: {error}"
488            ))
489        })?;
490        let lineage_mode = if local_lineage.parent_request_id.is_some() {
491            chio_core::session::RequestLineageMode::LocalChild
492        } else {
493            chio_core::session::RequestLineageMode::Root
494        };
495        let mut lineage_record = chio_core::session::RequestLineageRecord::new(
496            local_lineage.request_id.clone(),
497            anchor_reference,
498            local_lineage.operation_kind,
499            lineage_mode,
500            local_lineage.started_at,
501        );
502        if let Some(parent_request_id) = local_lineage.parent_request_id.clone() {
503            lineage_record = lineage_record.with_parent_request_id(parent_request_id);
504        }
505        let lineage_json = serde_json::to_value(&lineage_record).map_err(|error| {
506            KernelError::Internal(format!("failed to serialize request lineage: {error}"))
507        })?;
508        self.with_receipt_store(|store| {
509            Ok(store.record_request_lineage(
510                session.id().as_str(),
511                local_lineage.request_id.as_str(),
512                local_lineage
513                    .parent_request_id
514                    .as_ref()
515                    .map(|value| value.as_str()),
516                Some(anchor.id.as_str()),
517                local_lineage.started_at,
518                None,
519                &lineage_json,
520            )?)
521        })?;
522        Ok(())
523    }
524
525    /// Mark an in-flight session request as cancelled.
526    pub fn request_session_cancellation(
527        &self,
528        session_id: &SessionId,
529        request_id: &RequestId,
530    ) -> Result<(), KernelError> {
531        self.with_session_mut(session_id, |session| {
532            session
533                .request_cancellation(request_id)
534                .map_err(KernelError::from)
535        })
536    }
537
538    /// Validate whether a sampling child request is allowed for this session.
539    pub fn validate_sampling_request(
540        &self,
541        context: &OperationContext,
542        operation: &CreateMessageOperation,
543    ) -> Result<(), KernelError> {
544        self.with_sessions_read(|sessions| {
545            validate_sampling_request_in_sessions(
546                sessions,
547                self.config.allow_sampling,
548                self.config.allow_sampling_tool_use,
549                context,
550                operation,
551            )
552        })
553    }
554
555    /// Validate whether an elicitation child request is allowed for this session.
556    pub fn validate_elicitation_request(
557        &self,
558        context: &OperationContext,
559        operation: &CreateElicitationOperation,
560    ) -> Result<(), KernelError> {
561        self.with_sessions_read(|sessions| {
562            validate_elicitation_request_in_sessions(
563                sessions,
564                self.config.allow_elicitation,
565                context,
566                operation,
567            )
568        })
569    }
570
571    /// Evaluate a session-scoped tool call while allowing the target tool server to proxy
572    /// negotiated nested flows back through a client transport owned by the edge.
573    pub fn evaluate_tool_call_operation_with_nested_flow_client<C: NestedFlowClient>(
574        &self,
575        context: &OperationContext,
576        operation: &ToolCallOperation,
577        client: &mut C,
578    ) -> Result<ToolCallResponse, KernelError> {
579        self.validate_web3_evidence_prerequisites()?;
580        self.begin_session_request(context, OperationKind::ToolCall, true)?;
581
582        let request = ToolCallRequest {
583            request_id: context.request_id.to_string(),
584            capability: operation.capability.clone(),
585            tool_name: operation.tool_name.clone(),
586            server_id: operation.server_id.clone(),
587            agent_id: context.agent_id.clone(),
588            arguments: operation.arguments.clone(),
589            dpop_proof: None,
590            governed_intent: None,
591            approval_token: None,
592            model_metadata: operation.model_metadata.clone(),
593            federated_origin_kernel_id: None,
594        };
595
596        let result = self.evaluate_tool_call_with_nested_flow_client(context, &request, client);
597        let terminal_state = match &result {
598            Ok(response) => response.terminal_state.clone(),
599            Err(KernelError::RequestCancelled { request_id, reason })
600                if request_id == &context.request_id =>
601            {
602                self.with_session_mut(&context.session_id, |session| {
603                    session.request_cancellation(&context.request_id)?;
604                    Ok(())
605                })?;
606                OperationTerminalState::Cancelled {
607                    reason: reason.clone(),
608                }
609            }
610            _ => OperationTerminalState::Completed,
611        };
612        self.complete_session_request_with_terminal_state(
613            &context.session_id,
614            &context.request_id,
615            terminal_state,
616        )?;
617        result
618    }
619
620    /// Evaluate a normalized operation against a specific session.
621    ///
622    /// This is the higher-level entry point that future JSON-RPC or MCP edges
623    /// should target. The current stdio loop normalizes raw frames into these
624    /// operations before invoking the kernel.
625    pub fn evaluate_session_operation(
626        &self,
627        context: &OperationContext,
628        operation: &SessionOperation,
629    ) -> Result<SessionOperationResponse, KernelError> {
630        // Phase 1.5: install tenant_id scope for the duration of this
631        // session-scoped evaluation so every receipt signed here (tool
632        // call, resource read deny, etc.) is tagged with the session's
633        // tenant. The ToolCall branch also installs a scope via its
634        // sync_with_session_context path; the nested scope is a no-op
635        // because the value matches, but it keeps non-tool-call branches
636        // (e.g. evaluate_resource_read) covered.
637        let tenant_id = self.resolve_tenant_id_for_session(Some(&context.session_id));
638        let _tenant_scope = scope_receipt_tenant_id(tenant_id);
639
640        self.validate_web3_evidence_prerequisites()?;
641        let operation_kind = operation.kind();
642        let should_track_inflight = matches!(
643            operation,
644            SessionOperation::ToolCall(_)
645                | SessionOperation::ReadResource(_)
646                | SessionOperation::GetPrompt(_)
647                | SessionOperation::Complete(_)
648        );
649
650        if should_track_inflight {
651            self.begin_session_request(context, operation_kind, true)?;
652        } else {
653            self.with_session_mut(&context.session_id, |session| {
654                session.validate_context(context)?;
655                session.ensure_operation_allowed(operation_kind)?;
656                Ok(())
657            })?;
658        }
659
660        let evaluation = match operation {
661            SessionOperation::ToolCall(tool_call) => {
662                let request = ToolCallRequest {
663                    request_id: context.request_id.to_string(),
664                    capability: tool_call.capability.clone(),
665                    tool_name: tool_call.tool_name.clone(),
666                    server_id: tool_call.server_id.clone(),
667                    agent_id: context.agent_id.clone(),
668                    arguments: tool_call.arguments.clone(),
669                    dpop_proof: None,
670                    governed_intent: None,
671                    approval_token: None,
672                    model_metadata: tool_call.model_metadata.clone(),
673                    federated_origin_kernel_id: None,
674                };
675                let session_roots =
676                    self.session_enforceable_filesystem_root_paths_owned(&context.session_id)?;
677
678                // Phase 1.5: pass the session_id so the evaluate path can
679                // resolve tenant_id from session.auth_context for every
680                // receipt signed during this tool call.
681                self.evaluate_tool_call_sync_with_session_context(
682                    &request,
683                    Some(session_roots.as_slice()),
684                    None,
685                    Some(&context.session_id),
686                )
687                .map(SessionOperationResponse::ToolCall)
688            }
689            SessionOperation::CreateMessage(_) => Err(KernelError::Internal(
690                "sampling/createMessage must be evaluated by an MCP edge with a client transport"
691                    .to_string(),
692            )),
693            SessionOperation::CreateElicitation(_) => Err(KernelError::Internal(
694                "elicitation/create must be evaluated by an MCP edge with a client transport"
695                    .to_string(),
696            )),
697            SessionOperation::ListRoots => {
698                let roots = self
699                    .session(&context.session_id)
700                    .ok_or_else(|| KernelError::UnknownSession(context.session_id.clone()))?
701                    .roots()
702                    .to_vec();
703                Ok(SessionOperationResponse::RootList { roots })
704            }
705            SessionOperation::ListResources => {
706                let resources = self
707                    .list_resources_for_session(&context.session_id)?
708                    .into_iter()
709                    .collect();
710                Ok(SessionOperationResponse::ResourceList { resources })
711            }
712            SessionOperation::ReadResource(resource_read) => {
713                self.evaluate_resource_read(context, resource_read)
714            }
715            SessionOperation::ListResourceTemplates => {
716                let templates = self.list_resource_templates_for_session(&context.session_id)?;
717                Ok(SessionOperationResponse::ResourceTemplateList { templates })
718            }
719            SessionOperation::ListPrompts => {
720                let prompts = self.list_prompts_for_session(&context.session_id)?;
721                Ok(SessionOperationResponse::PromptList { prompts })
722            }
723            SessionOperation::GetPrompt(prompt_get) => self
724                .evaluate_prompt_get(context, prompt_get)
725                .map(|prompt| SessionOperationResponse::PromptGet { prompt }),
726            SessionOperation::Complete(complete) => self
727                .evaluate_completion(context, complete)
728                .map(|completion| SessionOperationResponse::Completion { completion }),
729            SessionOperation::ListCapabilities => {
730                let capabilities = self
731                    .session(&context.session_id)
732                    .ok_or_else(|| KernelError::UnknownSession(context.session_id.clone()))?
733                    .capabilities()
734                    .to_vec();
735
736                Ok(SessionOperationResponse::CapabilityList { capabilities })
737            }
738            SessionOperation::Heartbeat => Ok(SessionOperationResponse::Heartbeat),
739        };
740
741        if should_track_inflight {
742            let terminal_state = match &evaluation {
743                Ok(SessionOperationResponse::ToolCall(response)) => response.terminal_state.clone(),
744                _ => OperationTerminalState::Completed,
745            };
746            self.complete_session_request_with_terminal_state(
747                &context.session_id,
748                &context.request_id,
749                terminal_state,
750            )?;
751        }
752
753        evaluation
754    }
755
756    pub(crate) fn list_resources_for_session(
757        &self,
758        session_id: &SessionId,
759    ) -> Result<Vec<ResourceDefinition>, KernelError> {
760        let session = self
761            .session(session_id)
762            .ok_or_else(|| KernelError::UnknownSession(session_id.clone()))?;
763
764        let mut resources = Vec::new();
765        for provider in &self.resource_providers {
766            resources.extend(provider.list_resources().into_iter().filter(|resource| {
767                session.capabilities().iter().any(|capability| {
768                    capability_matches_resource_request(capability, &resource.uri).unwrap_or(false)
769                })
770            }));
771        }
772
773        Ok(resources)
774    }
775
776    pub(crate) fn resource_exists(&self, uri: &str) -> Result<bool, KernelError> {
777        for provider in &self.resource_providers {
778            if provider
779                .list_resources()
780                .iter()
781                .any(|resource| resource.uri == uri)
782            {
783                return Ok(true);
784            }
785
786            if provider.read_resource(uri)?.is_some() {
787                return Ok(true);
788            }
789        }
790
791        Ok(false)
792    }
793
794    pub(crate) fn list_resource_templates_for_session(
795        &self,
796        session_id: &SessionId,
797    ) -> Result<Vec<ResourceTemplateDefinition>, KernelError> {
798        let session = self
799            .session(session_id)
800            .ok_or_else(|| KernelError::UnknownSession(session_id.clone()))?;
801
802        let mut templates = Vec::new();
803        for provider in &self.resource_providers {
804            templates.extend(
805                provider
806                    .list_resource_templates()
807                    .into_iter()
808                    .filter(|template| {
809                        session.capabilities().iter().any(|capability| {
810                            capability_matches_resource_pattern(capability, &template.uri_template)
811                                .unwrap_or(false)
812                        })
813                    }),
814            );
815        }
816
817        Ok(templates)
818    }
819
820    pub(crate) fn evaluate_resource_read(
821        &self,
822        context: &OperationContext,
823        operation: &ReadResourceOperation,
824    ) -> Result<SessionOperationResponse, KernelError> {
825        self.validate_non_tool_capability(&operation.capability, &context.agent_id)?;
826
827        if !capability_matches_resource_request(&operation.capability, &operation.uri)? {
828            return Err(KernelError::OutOfScopeResource {
829                uri: operation.uri.clone(),
830            });
831        }
832
833        match self.enforce_resource_roots(context, operation) {
834            Ok(()) => {}
835            Err(KernelError::ResourceRootDenied { reason, .. }) => {
836                let receipt = self.build_resource_read_deny_receipt(operation, &reason)?;
837                return Ok(SessionOperationResponse::ResourceReadDenied { receipt });
838            }
839            Err(error) => return Err(error),
840        }
841
842        for provider in &self.resource_providers {
843            if let Some(contents) = provider.read_resource(&operation.uri)? {
844                return Ok(SessionOperationResponse::ResourceRead { contents });
845            }
846        }
847
848        Err(KernelError::ResourceNotRegistered(operation.uri.clone()))
849    }
850
851    pub(crate) fn list_prompts_for_session(
852        &self,
853        session_id: &SessionId,
854    ) -> Result<Vec<PromptDefinition>, KernelError> {
855        let session = self
856            .session(session_id)
857            .ok_or_else(|| KernelError::UnknownSession(session_id.clone()))?;
858
859        let mut prompts = Vec::new();
860        for provider in &self.prompt_providers {
861            prompts.extend(provider.list_prompts().into_iter().filter(|prompt| {
862                session.capabilities().iter().any(|capability| {
863                    capability_matches_prompt_request(capability, &prompt.name).unwrap_or(false)
864                })
865            }));
866        }
867
868        Ok(prompts)
869    }
870
871    pub(crate) fn evaluate_prompt_get(
872        &self,
873        context: &OperationContext,
874        operation: &GetPromptOperation,
875    ) -> Result<PromptResult, KernelError> {
876        self.validate_non_tool_capability(&operation.capability, &context.agent_id)?;
877
878        if !capability_matches_prompt_request(&operation.capability, &operation.prompt_name)? {
879            return Err(KernelError::OutOfScopePrompt {
880                prompt: operation.prompt_name.clone(),
881            });
882        }
883
884        for provider in &self.prompt_providers {
885            if let Some(prompt) =
886                provider.get_prompt(&operation.prompt_name, operation.arguments.clone())?
887            {
888                return Ok(prompt);
889            }
890        }
891
892        Err(KernelError::PromptNotRegistered(
893            operation.prompt_name.clone(),
894        ))
895    }
896
897    pub(crate) fn evaluate_completion(
898        &self,
899        context: &OperationContext,
900        operation: &CompleteOperation,
901    ) -> Result<CompletionResult, KernelError> {
902        self.validate_non_tool_capability(&operation.capability, &context.agent_id)?;
903
904        match &operation.reference {
905            CompletionReference::Prompt { name } => {
906                if !capability_matches_prompt_request(&operation.capability, name)? {
907                    return Err(KernelError::OutOfScopePrompt {
908                        prompt: name.clone(),
909                    });
910                }
911
912                for provider in &self.prompt_providers {
913                    if let Some(completion) = provider.complete_prompt_argument(
914                        name,
915                        &operation.argument.name,
916                        &operation.argument.value,
917                        &operation.context_arguments,
918                    )? {
919                        return Ok(completion);
920                    }
921                }
922
923                Err(KernelError::PromptNotRegistered(name.clone()))
924            }
925            CompletionReference::Resource { uri } => {
926                if !capability_matches_resource_pattern(&operation.capability, uri)? {
927                    return Err(KernelError::OutOfScopeResource { uri: uri.clone() });
928                }
929
930                for provider in &self.resource_providers {
931                    if let Some(completion) = provider.complete_resource_argument(
932                        uri,
933                        &operation.argument.name,
934                        &operation.argument.value,
935                        &operation.context_arguments,
936                    )? {
937                        return Ok(completion);
938                    }
939                }
940
941                Err(KernelError::ResourceNotRegistered(uri.clone()))
942            }
943        }
944    }
945}