Skip to main content

rain_engine_runtime/
lib.rs

1//! Reference HTTP runtime for RainEngine.
2//!
3//! The runtime owns request parsing and repeated calls to `AgentEngine::advance`
4//! until a terminal, suspended, delegated, or policy-stopped outcome is reached.
5
6use async_trait::async_trait;
7use axum::{
8    Json, Router,
9    body::to_bytes,
10    extract::{Path, Query, Request, State},
11    http::{StatusCode, header::CONTENT_TYPE},
12    response::{
13        IntoResponse,
14        sse::{Event, KeepAlive, Sse},
15    },
16    routing::{get, post},
17};
18use futures_util::stream;
19use rain_engine_blob::{BlobBackendConfig, build_blob_store};
20use rain_engine_core::{
21    AdvanceRequest, AgentEngine, AgentStateSnapshot, AgentTrigger, ApprovalDecision, AttachmentRef,
22    BlobStore, ContinueRequest, CorrelationId, EngineError, EngineOutcome, EnginePolicy,
23    InMemoryMemoryStore, LlmProvider, MemoryStore, MockLlmProvider, MultimodalPayload, NativeSkill,
24    PendingApprovalRecord, ProcessRequest, ProviderRequestConfig, RecordPageQuery, ResourcePolicy,
25    SessionListQuery, SessionRecord, SessionSnapshot, SkillCapability, SkillDefinition,
26    SkillExecutionError, SkillFailureKind, SkillInvocation, SkillManifest, SkillStore, StopReason,
27    ToolCallRecord, ToolResultRecord, WakeId, unix_time_ms,
28};
29use rain_engine_openai::{OpenAiCompatibleConfig, OpenAiCompatibleProvider};
30use rain_engine_provider_gemini::{GeminiAuth, GeminiConfig, GeminiProvider};
31use rain_engine_store_pg::PgMemoryStore;
32use rain_engine_store_sqlite::SqliteMemoryStore;
33use serde::{Deserialize, Serialize, de::DeserializeOwned};
34use serde_json::Value;
35use std::collections::{BTreeMap, BTreeSet, HashSet};
36use std::net::SocketAddr;
37use std::sync::Arc;
38use std::time::Duration;
39use thiserror::Error;
40use tracing::{error, info};
41use uuid::Uuid;
42
43const MAX_INGRESS_BODY_BYTES: usize = 64 * 1024 * 1024;
44
45#[typeshare::typeshare]
46#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
47pub struct WebhookIngressRequest {
48    pub session_id: String,
49    pub payload: Value,
50    #[serde(default)]
51    pub attachments: Vec<MultimodalPayload>,
52    #[serde(default)]
53    pub granted_scopes: BTreeSet<String>,
54    #[serde(default)]
55    pub idempotency_key: Option<String>,
56    #[serde(default)]
57    pub provider: Option<ProviderRequestConfig>,
58    #[serde(default)]
59    pub policy_override: Option<EnginePolicy>,
60}
61
62#[typeshare::typeshare]
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
64pub struct ApprovalIngressRequest {
65    pub session_id: String,
66    pub resume_token: String,
67    pub decision: ApprovalDecision,
68    #[serde(default)]
69    pub metadata: Value,
70    #[serde(default)]
71    pub granted_scopes: BTreeSet<String>,
72    #[serde(default)]
73    pub provider: Option<ProviderRequestConfig>,
74    #[serde(default)]
75    pub policy_override: Option<EnginePolicy>,
76}
77
78#[typeshare::typeshare]
79#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
80pub struct EventIngressRequest {
81    pub session_id: String,
82    pub payload: Value,
83    #[serde(default)]
84    pub attachments: Vec<MultimodalPayload>,
85    #[serde(default)]
86    pub granted_scopes: BTreeSet<String>,
87    #[serde(default)]
88    pub idempotency_key: Option<String>,
89    #[serde(default)]
90    pub provider: Option<ProviderRequestConfig>,
91    #[serde(default)]
92    pub policy_override: Option<EnginePolicy>,
93}
94
95#[typeshare::typeshare]
96#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
97pub struct HumanInputIngressRequest {
98    pub session_id: String,
99    pub content: String,
100    #[serde(default)]
101    pub attachments: Vec<MultimodalPayload>,
102    #[serde(default)]
103    pub granted_scopes: BTreeSet<String>,
104    #[serde(default)]
105    pub idempotency_key: Option<String>,
106    #[serde(default)]
107    pub provider: Option<ProviderRequestConfig>,
108    #[serde(default)]
109    pub policy_override: Option<EnginePolicy>,
110}
111
112#[typeshare::typeshare]
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
114pub struct ScheduledWakeIngressRequest {
115    pub session_id: String,
116    pub wake_id: String,
117    pub due_at: std::time::SystemTime,
118    pub reason: String,
119    #[serde(default)]
120    pub granted_scopes: BTreeSet<String>,
121    #[serde(default)]
122    pub provider: Option<ProviderRequestConfig>,
123    #[serde(default)]
124    pub policy_override: Option<EnginePolicy>,
125}
126
127#[typeshare::typeshare]
128#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
129pub struct DelegationResultIngressRequest {
130    pub session_id: String,
131    pub correlation_id: String,
132    pub payload: Value,
133    #[serde(default)]
134    pub metadata: Value,
135    #[serde(default)]
136    pub granted_scopes: BTreeSet<String>,
137    #[serde(default)]
138    pub provider: Option<ProviderRequestConfig>,
139    #[serde(default)]
140    pub policy_override: Option<EnginePolicy>,
141}
142
143#[typeshare::typeshare]
144#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
145pub struct InstallSkillRequest {
146    pub manifest: rain_engine_core::SkillManifest,
147    pub wasm_url: Option<String>,
148    pub wasm_base64: Option<String>,
149    pub file_path: Option<String>,
150}
151
152pub struct SkillInstallerSkill {
153    engine: AgentEngine,
154}
155
156impl SkillInstallerSkill {
157    pub fn new(engine: AgentEngine) -> Self {
158        Self { engine }
159    }
160}
161
162pub fn install_skill_manifest() -> SkillManifest {
163    SkillManifest {
164        name: "install_skill".to_string(),
165        description: "Installs a new WASM-based skill from a URL.".to_string(),
166        input_schema: serde_json::json!({
167            "type": "object",
168            "properties": {
169                "name": { "type": "string" },
170                "description": { "type": "string" },
171                "wasm_url": { "type": "string" },
172                "input_schema": { "type": "object" }
173            },
174            "required": ["name", "description", "wasm_url", "input_schema"]
175        }),
176        required_scopes: vec!["operator:skills".to_string()],
177        capability_grants: vec![],
178        resource_policy: ResourcePolicy::default_for_tools(),
179        approval_required: true,
180        circuit_breaker_threshold: 0.5,
181    }
182}
183
184#[async_trait]
185impl NativeSkill for SkillInstallerSkill {
186    async fn execute(&self, invocation: SkillInvocation) -> Result<Value, SkillExecutionError> {
187        let name = invocation
188            .args
189            .get("name")
190            .and_then(|v| v.as_str())
191            .ok_or_else(|| {
192                SkillExecutionError::new(SkillFailureKind::InvalidArguments, "missing name")
193            })?;
194        let description = invocation
195            .args
196            .get("description")
197            .and_then(|v| v.as_str())
198            .ok_or_else(|| {
199                SkillExecutionError::new(SkillFailureKind::InvalidArguments, "missing description")
200            })?;
201        let wasm_url = invocation.args.get("wasm_url").and_then(|v| v.as_str());
202        let wasm_base64 = invocation.args.get("wasm_base64").and_then(|v| v.as_str());
203        let file_path = invocation.args.get("file_path").and_then(|v| v.as_str());
204
205        if wasm_url.is_none() && wasm_base64.is_none() && file_path.is_none() {
206            return Err(SkillExecutionError::new(
207                SkillFailureKind::InvalidArguments,
208                "missing wasm_url, wasm_base64, or file_path",
209            ));
210        }
211
212        let input_schema = invocation
213            .args
214            .get("input_schema")
215            .cloned()
216            .ok_or_else(|| {
217                SkillExecutionError::new(SkillFailureKind::InvalidArguments, "missing input_schema")
218            })?;
219
220        let manifest = SkillManifest {
221            name: name.to_string(),
222            description: description.to_string(),
223            input_schema,
224            required_scopes: vec!["tool:run".to_string()],
225            capability_grants: vec![
226                SkillCapability::HttpOutbound {
227                    allow_hosts: vec![],
228                },
229                SkillCapability::StructuredLog,
230            ],
231            resource_policy: ResourcePolicy::default_for_tools(),
232            approval_required: false,
233            circuit_breaker_threshold: 0.5,
234        };
235
236        // Reuse the handler logic or just do it here
237        let wasm_bytes = if let Some(url) = wasm_url {
238            let client = reqwest::Client::new();
239            client
240                .get(url)
241                .send()
242                .await
243                .map_err(|err| {
244                    SkillExecutionError::new(
245                        SkillFailureKind::Internal,
246                        format!("Download failed: {}", err),
247                    )
248                })?
249                .bytes()
250                .await
251                .map_err(|err| {
252                    SkillExecutionError::new(
253                        SkillFailureKind::Internal,
254                        format!("Read failed: {}", err),
255                    )
256                })?
257                .to_vec()
258        } else if let Some(b64) = wasm_base64 {
259            use base64::{Engine as _, engine::general_purpose};
260            general_purpose::STANDARD.decode(b64).map_err(|err| {
261                SkillExecutionError::new(
262                    SkillFailureKind::InvalidArguments,
263                    format!("Base64 decode failed: {}", err),
264                )
265            })?
266        } else if let Some(path) = file_path {
267            tokio::fs::read(path).await.map_err(|err| {
268                SkillExecutionError::new(
269                    SkillFailureKind::InvalidArguments,
270                    format!("File read failed: {}", err),
271                )
272            })?
273        } else {
274            return Err(SkillExecutionError::new(
275                SkillFailureKind::InvalidArguments,
276                "missing wasm_url, wasm_base64, or file_path",
277            ));
278        };
279
280        let config = rain_engine_wasm::WasmSkillConfig {
281            manifest: manifest.clone(),
282            wasm_bytes: Arc::new(wasm_bytes.to_vec()),
283            capabilities: Arc::new(
284                rain_engine_wasm::InMemoryCapabilityHost::new().with_http_client(),
285            ),
286        };
287
288        let executor = rain_engine_wasm::WasmSkillExecutor::new(config).map_err(|err| {
289            SkillExecutionError::new(SkillFailureKind::Internal, format!("Init failed: {}", err))
290        })?;
291
292        self.engine
293            .register_wasm_skill_persistent(manifest, Arc::new(executor), wasm_bytes.to_vec())
294            .await
295            .map_err(|err| {
296                SkillExecutionError::new(
297                    SkillFailureKind::Internal,
298                    format!("Storage failed: {err}"),
299                )
300            })?;
301
302        Ok(serde_json::json!({
303            "status": "success",
304            "message": format!("Skill {} installed successfully", name)
305        }))
306    }
307}
308
309#[typeshare::typeshare]
310#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
311pub struct RuntimeServerConfig {
312    pub bind_address: SocketAddr,
313    pub request_timeout_ms: u64,
314    pub default_policy: EnginePolicy,
315    pub allow_policy_overrides: bool,
316    pub allow_provider_overrides: bool,
317    pub default_provider: ProviderRequestConfig,
318    #[serde(default)]
319    pub async_ingress: bool,
320}
321
322#[typeshare::typeshare]
323#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
324#[serde(tag = "kind", rename_all = "snake_case")]
325pub enum StoreBootstrapConfig {
326    InMemory,
327    Sqlite { database_url: String },
328    Postgres { database_url: String },
329}
330
331#[typeshare::typeshare]
332#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
333pub struct CacheBootstrapConfig {
334    pub valkey_url: Option<String>,
335}
336
337pub type BlobBootstrapConfig = BlobBackendConfig;
338
339#[typeshare::typeshare]
340#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
341#[serde(rename_all = "snake_case")]
342pub enum GeminiAuthMode {
343    ApiKey,
344    BearerToken,
345}
346
347#[typeshare::typeshare]
348#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
349#[serde(tag = "kind", rename_all = "snake_case")]
350pub enum ProviderBootstrapConfig {
351    Mock {
352        response: String,
353    },
354    OpenAiCompatible {
355        base_url: String,
356        api_key: String,
357        model: String,
358        #[serde(default)]
359        temperature: Option<f32>,
360        #[serde(default)]
361        max_tokens: Option<u32>,
362        #[serde(default = "default_system_prompt")]
363        system_prompt: String,
364    },
365    Gemini {
366        base_url: String,
367        auth_mode: GeminiAuthMode,
368        credential: String,
369        model: String,
370        #[serde(default)]
371        temperature: Option<f32>,
372        #[serde(default)]
373        max_tokens: Option<u32>,
374        #[serde(default = "default_gemini_system_instruction")]
375        system_instruction: String,
376        #[serde(default = "default_gemini_provider_name")]
377        provider_name: String,
378        #[serde(default = "default_gemini_embedding_model")]
379        embedding_model: String,
380    },
381}
382
383fn default_system_prompt() -> String {
384    "You are a server-side automation agent. Prefer tool calls when available. When replying directly, return plain text or JSON with type=yield.".to_string()
385}
386
387fn default_gemini_system_instruction() -> String {
388    "You are a multimodal server-side automation agent. Use tools when they can complete the task precisely.".to_string()
389}
390
391fn default_gemini_provider_name() -> String {
392    "gemini".to_string()
393}
394
395fn default_gemini_embedding_model() -> String {
396    "text-embedding-004".to_string()
397}
398
399#[typeshare::typeshare]
400#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
401pub struct RuntimeBootstrapConfig {
402    pub server: RuntimeServerConfig,
403    pub store: StoreBootstrapConfig,
404    #[serde(default)]
405    pub cache: Option<CacheBootstrapConfig>,
406    pub blob: BlobBootstrapConfig,
407    pub provider: ProviderBootstrapConfig,
408    #[serde(default)]
409    pub enable_research_planner: bool,
410}
411
412#[derive(Clone)]
413pub struct RuntimeState {
414    engine: AgentEngine,
415    memory: Arc<dyn MemoryStore>,
416    blob_store: Arc<dyn BlobStore>,
417    config: RuntimeServerConfig,
418    provider_kind: String,
419    default_scopes: Vec<String>,
420    channel_statuses: Vec<RuntimeChannelView>,
421    settings: RuntimeMutableSettings,
422    ingress: Option<Arc<rain_engine_ingress::ValkeyStreamIngress>>,
423}
424
425#[typeshare::typeshare]
426#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
427pub struct RuntimeRunResult {
428    pub advances: Vec<rain_engine_core::AdvanceResult>,
429    pub outcome: EngineOutcome,
430}
431
432#[typeshare::typeshare]
433#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
434#[serde(rename_all = "snake_case")]
435pub enum SessionStatus {
436    Empty,
437    Running,
438    Completed,
439    Suspended,
440    Delegated,
441    Stopped,
442    Failed,
443}
444
445#[typeshare::typeshare]
446#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
447#[serde(rename_all = "snake_case")]
448pub enum SessionActivityState {
449    Idle,
450    Reasoning,
451    RunningTools,
452    WaitingHuman,
453    WaitingExternal,
454    Scheduled,
455    Delegated,
456    Errored,
457}
458
459#[typeshare::typeshare]
460#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
461pub struct WakeView {
462    pub wake_id: String,
463    pub reason: String,
464    pub status: String,
465    pub occurred_at_ms: i64,
466    pub due_at_ms: Option<i64>,
467    pub task_id: Option<String>,
468}
469
470#[typeshare::typeshare]
471#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
472pub struct HeartbeatStatusView {
473    pub wake_id: String,
474    pub reason: String,
475    pub occurred_at_ms: i64,
476    pub outcome_summary: Option<String>,
477    pub stop_reason: Option<StopReason>,
478}
479
480#[typeshare::typeshare]
481#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
482pub struct SessionListItemView {
483    pub session_id: String,
484    pub status: SessionStatus,
485    pub activity_state: SessionActivityState,
486    pub current_focus: Option<String>,
487    pub latest_provider: Option<String>,
488    pub last_activity_at_ms: i64,
489    pub pending_approval: bool,
490    pub pending_wake: bool,
491    pub unread_event_count: usize,
492    pub active_channel_ids: Vec<String>,
493    pub record_count: usize,
494}
495
496#[typeshare::typeshare]
497#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
498#[serde(rename_all = "snake_case")]
499pub enum RuntimeChannelStatus {
500    Connected,
501    Degraded,
502    Disabled,
503}
504
505#[typeshare::typeshare]
506#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
507pub struct RuntimeChannelView {
508    pub channel_id: String,
509    pub label: String,
510    pub transport: String,
511    pub status: RuntimeChannelStatus,
512    pub detail: Option<String>,
513}
514
515#[typeshare::typeshare]
516#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
517pub struct ApprovalMetadataSupport {
518    pub structured_json: bool,
519    pub recommended_fields: Vec<String>,
520}
521
522#[typeshare::typeshare]
523#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
524pub struct UploadSupport {
525    pub multipart: bool,
526    pub max_request_bytes: usize,
527}
528
529#[typeshare::typeshare]
530#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
531pub struct ApprovalView {
532    pub resume_token: String,
533    pub created_at_ms: i64,
534    pub trigger_id: String,
535    pub step: usize,
536    pub reason: String,
537    pub pending_calls: Vec<rain_engine_core::PlannedSkillCall>,
538}
539
540#[typeshare::typeshare]
541#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
542pub struct ToolTimelineItem {
543    pub call_id: String,
544    pub skill_name: String,
545    pub step: usize,
546    pub called_at_ms: i64,
547    pub finished_at_ms: Option<i64>,
548    pub backend_kind: String,
549    pub args: Value,
550    pub success: Option<bool>,
551    pub output_preview: Option<String>,
552    pub failure_kind: Option<String>,
553}
554
555#[typeshare::typeshare]
556#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
557pub struct SelfImprovementView {
558    pub active_overlay: Option<rain_engine_core::PolicyOverlay>,
559    pub reflections: Vec<rain_engine_core::ReflectionRecord>,
560    pub policy_tunings: Vec<rain_engine_core::PolicyTuningRecord>,
561    pub strategy_preferences: Vec<rain_engine_core::StrategyPreferenceRecord>,
562    pub tool_performance: Vec<rain_engine_core::ToolPerformanceRecord>,
563    pub profile_patches: Vec<rain_engine_core::ProfilePatchRecord>,
564}
565
566#[typeshare::typeshare]
567#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
568pub struct ExecutionGraphView {
569    pub active_graph: Option<rain_engine_core::ToolExecutionGraph>,
570    pub graphs: Vec<rain_engine_core::ToolExecutionGraph>,
571    pub checkpoints: Vec<rain_engine_core::ToolNodeCheckpointRecord>,
572    pub validations: Vec<rain_engine_core::SkillInputValidationRecord>,
573    pub blocked_call_ids: Vec<String>,
574}
575
576#[typeshare::typeshare]
577#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
578#[serde(tag = "type", content = "payload")]
579pub enum TimelineItem {
580    HumanInput {
581        actor_id: String,
582        content: String,
583        occurred_at_ms: i64,
584    },
585    AssistantResponse {
586        content: String,
587        stop_reason: StopReason,
588        occurred_at_ms: i64,
589    },
590    ToolCall {
591        call_id: String,
592        skill_name: String,
593        formatted_call: String,
594        occurred_at_ms: i64,
595    },
596    ToolResult {
597        call_id: String,
598        skill_name: String,
599        success: bool,
600        preview: String,
601        occurred_at_ms: i64,
602    },
603    ApprovalRequested {
604        resume_token: String,
605        pending_calls: Vec<rain_engine_core::PlannedSkillCall>,
606        occurred_at_ms: i64,
607    },
608    ApprovalResolved {
609        resume_token: String,
610        decision: ApprovalDecision,
611        occurred_at_ms: i64,
612    },
613    Plan {
614        summary: String,
615        candidate_actions: Vec<String>,
616        confidence: f64,
617        outcome: rain_engine_core::DeliberationOutcome,
618        occurred_at_ms: i64,
619    },
620    ToolCheckpoint {
621        call_id: String,
622        skill_name: String,
623        status: rain_engine_core::ToolNodeStatus,
624        attempt: usize,
625        detail: Option<String>,
626        occurred_at_ms: i64,
627    },
628    ValidationFailure {
629        call_id: String,
630        skill_name: String,
631        errors: Vec<String>,
632        occurred_at_ms: i64,
633    },
634    Learning {
635        label: String,
636        detail: String,
637        confidence: f64,
638        occurred_at_ms: i64,
639    },
640    System {
641        label: String,
642        detail: String,
643        occurred_at_ms: i64,
644    },
645}
646
647#[typeshare::typeshare]
648#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
649pub struct SessionView {
650    pub session_id: String,
651    pub status: SessionStatus,
652    pub activity_state: SessionActivityState,
653    pub current_focus: Option<String>,
654    pub current_task_id: Option<String>,
655    pub current_task_title: Option<String>,
656    pub next_wake_at_ms: Option<i64>,
657    pub blocked_reason: Option<String>,
658    pub last_human_input_at_ms: Option<i64>,
659    pub last_assistant_activity_at_ms: Option<i64>,
660    pub active_channel_ids: Vec<String>,
661    pub pending_wake: Option<WakeView>,
662    pub wake_history: Vec<WakeView>,
663    pub last_heartbeat: Option<HeartbeatStatusView>,
664    pub last_sequence_no: Option<i64>,
665    pub latest_outcome: Option<rain_engine_core::OutcomeRecord>,
666    pub pending_approval: Option<ApprovalView>,
667    pub state: AgentStateSnapshot,
668    pub timeline: Vec<TimelineItem>,
669    pub tool_timeline: Vec<ToolTimelineItem>,
670    pub self_improvement: SelfImprovementView,
671    pub execution_graph: ExecutionGraphView,
672    pub record_count: usize,
673    pub total_estimated_cost_usd: f64,
674}
675
676#[typeshare::typeshare]
677#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
678pub struct RuntimeCapabilities {
679    pub version: String,
680    pub provider_kind: String,
681    pub default_model: Option<String>,
682    pub streaming: bool,
683    pub approvals: bool,
684    pub multipart_uploads: bool,
685    pub default_scopes: Vec<String>,
686    pub default_policy: EnginePolicy,
687    pub channels: Vec<RuntimeChannelView>,
688    pub approval_metadata: ApprovalMetadataSupport,
689    pub wake_support: bool,
690    pub delegation_support: bool,
691    pub learning_support: bool,
692    pub upload_limits: UploadSupport,
693    pub skills: Vec<SkillDefinition>,
694}
695
696#[typeshare::typeshare]
697#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
698pub struct RuntimeSettingsView {
699    pub shell_exec: MutableSkillPolicyView,
700    pub http_fetch: MutableSkillPolicyView,
701    pub web_reader: MutableSkillPolicyView,
702    pub engine_policy: rain_engine_core::EnginePolicy,
703    pub provider_config: rain_engine_core::ProviderRequestConfig,
704}
705
706#[typeshare::typeshare]
707#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
708pub struct MutableSkillPolicyView {
709    pub permissive: bool,
710    pub allowlist: Vec<String>,
711    pub timeout_secs: u64,
712}
713
714#[typeshare::typeshare]
715#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
716pub struct RuntimeSettingsUpdateRequest {
717    #[serde(default)]
718    pub shell_exec: Option<MutableSkillPolicyUpdate>,
719    #[serde(default)]
720    pub http_fetch: Option<MutableSkillPolicyUpdate>,
721    #[serde(default)]
722    pub web_reader: Option<MutableSkillPolicyUpdate>,
723    #[serde(default)]
724    pub engine_policy: Option<rain_engine_core::EnginePolicy>,
725    #[serde(default)]
726    pub provider_config: Option<rain_engine_core::ProviderRequestConfig>,
727}
728
729#[typeshare::typeshare]
730#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
731pub struct MutableSkillPolicyUpdate {
732    #[serde(default)]
733    pub permissive: Option<bool>,
734    #[serde(default)]
735    pub allowlist: Option<Vec<String>>,
736}
737
738#[derive(Clone)]
739pub struct ManagedSkillPolicy {
740    pub access: rain_engine_skills::SharedAccessPolicy,
741    pub timeout: Duration,
742}
743
744impl ManagedSkillPolicy {
745    pub fn new(access: rain_engine_skills::SharedAccessPolicy, timeout: Duration) -> Self {
746        Self { access, timeout }
747    }
748
749    async fn to_view(&self) -> MutableSkillPolicyView {
750        let policy = self.access.read().await;
751        let mut allowlist = policy.allowlist.iter().cloned().collect::<Vec<_>>();
752        allowlist.sort();
753        MutableSkillPolicyView {
754            permissive: policy.permissive,
755            allowlist,
756            timeout_secs: self.timeout.as_secs(),
757        }
758    }
759
760    async fn apply(&self, update: MutableSkillPolicyUpdate) {
761        let mut policy = self.access.write().await;
762        if let Some(permissive) = update.permissive {
763            policy.permissive = permissive;
764        }
765        if let Some(allowlist) = update.allowlist {
766            policy.allowlist = normalize_allowlist(allowlist);
767        }
768    }
769}
770
771#[derive(Clone)]
772pub struct RuntimeMutableSettings {
773    pub shell_exec: ManagedSkillPolicy,
774    pub http_fetch: ManagedSkillPolicy,
775    pub web_reader: ManagedSkillPolicy,
776    pub engine_policy: std::sync::Arc<tokio::sync::RwLock<rain_engine_core::EnginePolicy>>,
777    pub provider_config:
778        std::sync::Arc<tokio::sync::RwLock<rain_engine_core::ProviderRequestConfig>>,
779}
780
781impl RuntimeMutableSettings {
782    pub fn defaults(
783        engine_policy: rain_engine_core::EnginePolicy,
784        provider_config: rain_engine_core::ProviderRequestConfig,
785    ) -> Self {
786        let timeout = Duration::from_secs(30);
787        Self {
788            shell_exec: ManagedSkillPolicy::new(
789                rain_engine_skills::shared_access_policy(HashSet::new(), false),
790                timeout,
791            ),
792            http_fetch: ManagedSkillPolicy::new(
793                rain_engine_skills::shared_access_policy(HashSet::new(), false),
794                timeout,
795            ),
796            web_reader: ManagedSkillPolicy::new(
797                rain_engine_skills::shared_access_policy(HashSet::new(), false),
798                timeout,
799            ),
800            engine_policy: std::sync::Arc::new(tokio::sync::RwLock::new(engine_policy)),
801            provider_config: std::sync::Arc::new(tokio::sync::RwLock::new(provider_config)),
802        }
803    }
804
805    async fn to_view(&self) -> RuntimeSettingsView {
806        let engine_policy = self.engine_policy.read().await.clone();
807        let provider_config = self.provider_config.read().await.clone();
808        RuntimeSettingsView {
809            shell_exec: self.shell_exec.to_view().await,
810            http_fetch: self.http_fetch.to_view().await,
811            web_reader: self.web_reader.to_view().await,
812            engine_policy,
813            provider_config,
814        }
815    }
816
817    async fn apply(&self, update: RuntimeSettingsUpdateRequest) {
818        if let Some(shell_exec) = update.shell_exec {
819            self.shell_exec.apply(shell_exec).await;
820        }
821        if let Some(http_fetch) = update.http_fetch {
822            self.http_fetch.apply(http_fetch).await;
823        }
824        if let Some(web_reader) = update.web_reader {
825            self.web_reader.apply(web_reader).await;
826        }
827        if let Some(engine_policy) = update.engine_policy {
828            *self.engine_policy.write().await = engine_policy;
829        }
830        if let Some(provider_config) = update.provider_config {
831            *self.provider_config.write().await = provider_config;
832        }
833    }
834}
835
836impl RuntimeState {
837    pub fn new(
838        engine: AgentEngine,
839        memory: Arc<dyn MemoryStore>,
840        blob_store: Arc<dyn BlobStore>,
841        config: RuntimeServerConfig,
842        settings: RuntimeMutableSettings,
843    ) -> Self {
844        Self {
845            engine,
846            memory,
847            blob_store,
848            config,
849            provider_kind: "custom".to_string(),
850            default_scopes: vec!["tool:run".to_string()],
851            channel_statuses: Vec::new(),
852            settings,
853            ingress: None,
854        }
855    }
856
857    pub fn with_provider_kind(mut self, provider_kind: impl Into<String>) -> Self {
858        self.provider_kind = provider_kind.into();
859        self
860    }
861
862    pub fn with_default_scopes(mut self, default_scopes: Vec<String>) -> Self {
863        self.default_scopes = default_scopes;
864        self
865    }
866
867    pub fn with_channel_statuses(mut self, channel_statuses: Vec<RuntimeChannelView>) -> Self {
868        self.channel_statuses = channel_statuses;
869        self
870    }
871
872    pub fn with_settings(mut self, settings: RuntimeMutableSettings) -> Self {
873        self.settings = settings;
874        self
875    }
876
877    pub fn with_ingress(mut self, ingress: rain_engine_ingress::ValkeyStreamIngress) -> Self {
878        self.ingress = Some(Arc::new(ingress));
879        self
880    }
881
882    pub fn engine(&self) -> &AgentEngine {
883        &self.engine
884    }
885
886    pub fn memory(&self) -> Arc<dyn MemoryStore> {
887        self.memory.clone()
888    }
889
890    pub fn blob_store(&self) -> Arc<dyn BlobStore> {
891        self.blob_store.clone()
892    }
893
894    pub fn settings(&self) -> RuntimeMutableSettings {
895        self.settings.clone()
896    }
897}
898
899fn normalize_allowlist(values: Vec<String>) -> HashSet<String> {
900    values
901        .into_iter()
902        .map(|value| value.trim().to_string())
903        .filter(|value| !value.is_empty())
904        .collect()
905}
906
907#[derive(Debug, Error)]
908pub enum RuntimeConfigError {
909    #[error("{0}")]
910    Invalid(String),
911}
912
913#[derive(Debug, Error)]
914enum IngressError {
915    #[error("unsupported content type")]
916    UnsupportedContentType,
917    #[error("malformed request: {0}")]
918    Malformed(String),
919    #[error("blob storage failed: {0}")]
920    Blob(String),
921}
922
923pub async fn build_runtime_state(
924    config: RuntimeBootstrapConfig,
925) -> Result<RuntimeState, RuntimeConfigError> {
926    let provider_kind = match &config.provider {
927        ProviderBootstrapConfig::Mock { .. } => "mock",
928        ProviderBootstrapConfig::OpenAiCompatible { .. } => "openai_compatible",
929        ProviderBootstrapConfig::Gemini { .. } => "gemini",
930    }
931    .to_string();
932
933    let (memory, skill_store): (Arc<dyn MemoryStore>, Option<Arc<dyn SkillStore>>) =
934        match &config.store {
935            StoreBootstrapConfig::InMemory => (
936                Arc::new(InMemoryMemoryStore::new()),
937                Some(Arc::new(rain_engine_core::InMemorySkillStore::new()) as Arc<dyn SkillStore>),
938            ),
939            StoreBootstrapConfig::Sqlite { database_url } => {
940                if database_url.trim().is_empty() {
941                    return Err(RuntimeConfigError::Invalid(
942                        "sqlite database_url must not be empty".to_string(),
943                    ));
944                }
945                let store = Arc::new(
946                    SqliteMemoryStore::connect(database_url)
947                        .await
948                        .map_err(|err| RuntimeConfigError::Invalid(err.message))?,
949                );
950                (
951                    store.clone() as Arc<dyn MemoryStore>,
952                    Some(store as Arc<dyn SkillStore>),
953                )
954            }
955            StoreBootstrapConfig::Postgres { database_url } => {
956                if database_url.trim().is_empty() {
957                    return Err(RuntimeConfigError::Invalid(
958                        "postgres database_url must not be empty".to_string(),
959                    ));
960                }
961                (
962                    Arc::new(
963                        PgMemoryStore::connect_lazy(database_url)
964                            .map_err(|err| RuntimeConfigError::Invalid(err.message))?,
965                    ),
966                    None,
967                )
968            }
969        };
970
971    let blob_store: Arc<dyn BlobStore> = Arc::from(
972        build_blob_store(&config.blob).map_err(|err| RuntimeConfigError::Invalid(err.message))?,
973    );
974
975    let llm: Arc<dyn LlmProvider> = match &config.provider {
976        ProviderBootstrapConfig::Mock { response } => {
977            let response_str = response.clone();
978            Arc::new(MockLlmProvider::dynamic(move |_| {
979                Ok(rain_engine_core::ProviderDecision {
980                    action: rain_engine_core::AgentAction::Respond {
981                        content: response_str.clone(),
982                    },
983                    usage: None,
984                    cache: None,
985                })
986            }))
987        }
988        ProviderBootstrapConfig::OpenAiCompatible {
989            base_url,
990            api_key,
991            model,
992            temperature,
993            max_tokens,
994            system_prompt,
995        } => Arc::new(
996            OpenAiCompatibleProvider::new(OpenAiCompatibleConfig {
997                base_url: base_url.clone(),
998                api_key: api_key.clone(),
999                default_request: ProviderRequestConfig {
1000                    model: Some(model.clone()),
1001                    temperature: *temperature,
1002                    max_tokens: *max_tokens,
1003                },
1004                system_prompt: system_prompt.clone(),
1005            })
1006            .map_err(|err| RuntimeConfigError::Invalid(err.to_string()))?,
1007        ),
1008        ProviderBootstrapConfig::Gemini {
1009            base_url,
1010            auth_mode,
1011            credential,
1012            model,
1013            temperature,
1014            max_tokens,
1015            system_instruction,
1016            provider_name,
1017            embedding_model,
1018        } => Arc::new(
1019            GeminiProvider::new(GeminiConfig {
1020                base_url: base_url.clone(),
1021                auth: match auth_mode {
1022                    GeminiAuthMode::ApiKey => GeminiAuth::ApiKey(credential.clone()),
1023                    GeminiAuthMode::BearerToken => GeminiAuth::BearerToken(credential.clone()),
1024                },
1025                default_request: ProviderRequestConfig {
1026                    model: Some(model.clone()),
1027                    temperature: *temperature,
1028                    max_tokens: *max_tokens,
1029                },
1030                system_instruction: system_instruction.clone(),
1031                provider_name: provider_name.clone(),
1032                embedding_model: embedding_model.clone(),
1033            })
1034            .map_err(|err| RuntimeConfigError::Invalid(err.to_string()))?,
1035        ),
1036    };
1037
1038    let mut engine = AgentEngine::new(llm.clone(), memory.clone());
1039
1040    if let Some(cache_config) = &config.cache
1041        && let Some(valkey_url) = &cache_config.valkey_url
1042    {
1043        let valkey_cache = rain_engine_core::ValkeyStateCache::new(valkey_url, "rain")
1044            .map_err(|err| RuntimeConfigError::Invalid(format!("Valkey cache error: {}", err)))?;
1045        engine = engine.with_state_cache(Arc::new(valkey_cache));
1046        tracing::info!("Configured Valkey state projection cache");
1047    }
1048
1049    if let Some(store) = &skill_store {
1050        engine = engine.with_skill_store(store.clone());
1051
1052        // Reload persisted WASM skills
1053        if let Ok(persisted_skills) = store.list_skills().await {
1054            info!("Reloading {} persisted skills...", persisted_skills.len());
1055            for (manifest, wasm_bytes) in persisted_skills {
1056                let config = rain_engine_wasm::WasmSkillConfig {
1057                    manifest: manifest.clone(),
1058                    wasm_bytes: Arc::new(wasm_bytes),
1059                    capabilities: Arc::new(
1060                        rain_engine_wasm::InMemoryCapabilityHost::new().with_http_client(),
1061                    ),
1062                };
1063                match rain_engine_wasm::WasmSkillExecutor::new(config) {
1064                    Ok(executor) => {
1065                        engine.register_wasm_skill(manifest, Arc::new(executor));
1066                    }
1067                    Err(err) => {
1068                        error!("Failed to reload skill {}: {}", manifest.name, err);
1069                    }
1070                }
1071            }
1072        }
1073    }
1074
1075    engine.register_native_skill(
1076        install_skill_manifest(),
1077        Arc::new(SkillInstallerSkill::new(engine.clone())),
1078    );
1079
1080    let settings = RuntimeMutableSettings::defaults(
1081        rain_engine_core::EnginePolicy::default(),
1082        rain_engine_core::ProviderRequestConfig::default(),
1083    );
1084    engine.register_native_skill(
1085        rain_engine_skills::web_reader::manifest(),
1086        Arc::new(
1087            rain_engine_skills::web_reader::WebReaderSkill::with_shared_policy(
1088                settings.web_reader.access.clone(),
1089                settings.web_reader.timeout,
1090            ),
1091        ),
1092    );
1093
1094    if config.enable_research_planner {
1095        engine = engine.with_planner(Arc::new(rain_engine_cognition::ResearchPlanner::new(llm)));
1096    }
1097
1098    Ok(
1099        RuntimeState::new(engine, memory, blob_store, config.server, settings)
1100            .with_provider_kind(provider_kind),
1101    )
1102}
1103
1104pub fn app(state: RuntimeState) -> Router {
1105    Router::new()
1106        // Trigger (write) routes
1107        .route("/triggers/webhook/{source}", post(handle_webhook))
1108        .route("/triggers/external/{source}", post(handle_external_event))
1109        .route("/triggers/human/{actor_id}", post(handle_human_input))
1110        .route("/triggers/system/{source}", post(handle_system_observation))
1111        .route("/triggers/wake", post(handle_scheduled_wake))
1112        .route("/triggers/approval", post(handle_approval))
1113        .route(
1114            "/triggers/delegation-result",
1115            post(handle_delegation_result),
1116        )
1117        // Read routes
1118        .route("/health", get(handle_health))
1119        .route("/capabilities", get(handle_capabilities))
1120        .route(
1121            "/settings",
1122            get(handle_get_settings).put(handle_update_settings),
1123        )
1124        .route("/sessions", get(handle_list_sessions))
1125        .route("/sessions/views", get(handle_list_session_views))
1126        .route("/sessions/{session_id}", get(handle_get_session))
1127        .route("/sessions/{session_id}/view", get(handle_get_session_view))
1128        .route(
1129            "/sessions/{session_id}/execution-graph",
1130            get(handle_get_execution_graph),
1131        )
1132        .route("/sessions/{session_id}/records", get(handle_list_records))
1133        // Capabilities routes
1134        .route("/capabilities/skills", post(handle_install_skill))
1135        // SSE streaming
1136        .route("/sessions/{session_id}/stream", get(handle_sse_stream))
1137        .with_state(state)
1138}
1139
1140pub async fn serve(addr: SocketAddr, state: RuntimeState) -> Result<(), std::io::Error> {
1141    let listener = tokio::net::TcpListener::bind(addr).await?;
1142    axum::serve(listener, app(state)).await
1143}
1144
1145async fn handle_external_event(
1146    State(state): State<RuntimeState>,
1147    Path(source): Path<String>,
1148    Json(request): Json<EventIngressRequest>,
1149) -> Result<axum::response::Response, (StatusCode, String)> {
1150    let policy = effective_policy(&state, request.policy_override).await;
1151    let provider = effective_provider(&state, request.provider).await;
1152    let attachments = materialize_attachments(
1153        &state,
1154        policy.max_inline_attachment_bytes,
1155        request.attachments,
1156    )
1157    .await
1158    .map_err(map_ingress_error)?;
1159    run_process_request(
1160        &state,
1161        ProcessRequest {
1162            session_id: request.session_id,
1163            trigger: AgentTrigger::ExternalEvent {
1164                source,
1165                payload: request.payload,
1166                attachments,
1167            },
1168            granted_scopes: request.granted_scopes,
1169            idempotency_key: request.idempotency_key,
1170            policy,
1171            provider,
1172            cancellation: tokio_util::sync::CancellationToken::new(),
1173        },
1174    )
1175    .await
1176}
1177
1178async fn handle_human_input(
1179    State(state): State<RuntimeState>,
1180    Path(actor_id): Path<String>,
1181    Json(request): Json<HumanInputIngressRequest>,
1182) -> Result<axum::response::Response, (StatusCode, String)> {
1183    let policy = effective_policy(&state, request.policy_override).await;
1184    let provider = effective_provider(&state, request.provider).await;
1185    let attachments = materialize_attachments(
1186        &state,
1187        policy.max_inline_attachment_bytes,
1188        request.attachments,
1189    )
1190    .await
1191    .map_err(map_ingress_error)?;
1192    run_process_request(
1193        &state,
1194        ProcessRequest {
1195            session_id: request.session_id,
1196            trigger: AgentTrigger::HumanInput {
1197                actor_id,
1198                content: request.content,
1199                attachments,
1200            },
1201            granted_scopes: request.granted_scopes,
1202            idempotency_key: request.idempotency_key,
1203            policy,
1204            provider,
1205            cancellation: tokio_util::sync::CancellationToken::new(),
1206        },
1207    )
1208    .await
1209}
1210
1211async fn handle_system_observation(
1212    State(state): State<RuntimeState>,
1213    Path(source): Path<String>,
1214    Json(request): Json<EventIngressRequest>,
1215) -> Result<axum::response::Response, (StatusCode, String)> {
1216    let policy = effective_policy(&state, request.policy_override).await;
1217    let provider = effective_provider(&state, request.provider).await;
1218    let attachments = materialize_attachments(
1219        &state,
1220        policy.max_inline_attachment_bytes,
1221        request.attachments,
1222    )
1223    .await
1224    .map_err(map_ingress_error)?;
1225    run_process_request(
1226        &state,
1227        ProcessRequest {
1228            session_id: request.session_id,
1229            trigger: AgentTrigger::SystemObservation {
1230                source,
1231                observation: request.payload,
1232                attachments,
1233            },
1234            granted_scopes: request.granted_scopes,
1235            idempotency_key: request.idempotency_key,
1236            policy,
1237            provider,
1238            cancellation: tokio_util::sync::CancellationToken::new(),
1239        },
1240    )
1241    .await
1242}
1243
1244async fn handle_scheduled_wake(
1245    State(state): State<RuntimeState>,
1246    Json(request): Json<ScheduledWakeIngressRequest>,
1247) -> Result<axum::response::Response, (StatusCode, String)> {
1248    let policy = effective_policy(&state, request.policy_override).await;
1249    let provider = effective_provider(&state, request.provider).await;
1250    run_process_request(
1251        &state,
1252        ProcessRequest {
1253            session_id: request.session_id,
1254            trigger: AgentTrigger::ScheduledWake {
1255                wake_id: WakeId(request.wake_id),
1256                due_at: request.due_at,
1257                reason: request.reason,
1258            },
1259            granted_scopes: request.granted_scopes,
1260            idempotency_key: None,
1261            policy,
1262            provider,
1263            cancellation: tokio_util::sync::CancellationToken::new(),
1264        },
1265    )
1266    .await
1267}
1268
1269pub fn init_tracing() {
1270    let _ = tracing_subscriber::fmt::try_init();
1271}
1272
1273async fn handle_delegation_result(
1274    State(state): State<RuntimeState>,
1275    Json(request): Json<DelegationResultIngressRequest>,
1276) -> Result<axum::response::Response, (StatusCode, String)> {
1277    let policy = effective_policy(&state, request.policy_override).await;
1278    let provider = effective_provider(&state, request.provider).await;
1279    run_process_request(
1280        &state,
1281        ProcessRequest {
1282            session_id: request.session_id,
1283            trigger: AgentTrigger::DelegationResult {
1284                correlation_id: CorrelationId(request.correlation_id),
1285                payload: request.payload,
1286                metadata: request.metadata,
1287            },
1288            granted_scopes: request.granted_scopes,
1289            idempotency_key: None,
1290            policy,
1291            provider,
1292            cancellation: tokio_util::sync::CancellationToken::new(),
1293        },
1294    )
1295    .await
1296}
1297
1298async fn handle_webhook(
1299    State(state): State<RuntimeState>,
1300    Path(source): Path<String>,
1301    request: Request,
1302) -> Result<axum::response::Response, (StatusCode, String)> {
1303    let envelope = parse_webhook_envelope(request)
1304        .await
1305        .map_err(map_ingress_error)?;
1306    let policy = effective_policy(&state, envelope.policy_override).await;
1307    let provider = effective_provider(&state, envelope.provider).await;
1308    let attachments = materialize_attachments(
1309        &state,
1310        policy.max_inline_attachment_bytes,
1311        envelope.attachments,
1312    )
1313    .await
1314    .map_err(map_ingress_error)?;
1315
1316    run_process_request(
1317        &state,
1318        ProcessRequest {
1319            session_id: envelope.session_id,
1320            trigger: AgentTrigger::Webhook {
1321                source,
1322                payload: envelope.payload,
1323                attachments,
1324            },
1325            granted_scopes: envelope.granted_scopes,
1326            idempotency_key: envelope.idempotency_key,
1327            policy,
1328            provider,
1329            cancellation: tokio_util::sync::CancellationToken::new(),
1330        },
1331    )
1332    .await
1333}
1334
1335async fn handle_approval(
1336    State(state): State<RuntimeState>,
1337    Json(request): Json<ApprovalIngressRequest>,
1338) -> Result<axum::response::Response, (StatusCode, String)> {
1339    let policy = effective_policy(&state, request.policy_override).await;
1340    let provider = effective_provider(&state, request.provider).await;
1341    run_process_request(
1342        &state,
1343        ProcessRequest {
1344            session_id: request.session_id,
1345            trigger: AgentTrigger::Approval {
1346                resume_token: rain_engine_core::ResumeToken(request.resume_token),
1347                decision: request.decision,
1348                metadata: request.metadata,
1349            },
1350            granted_scopes: request.granted_scopes,
1351            idempotency_key: None,
1352            policy,
1353            provider,
1354            cancellation: tokio_util::sync::CancellationToken::new(),
1355        },
1356    )
1357    .await
1358}
1359
1360async fn parse_webhook_envelope(request: Request) -> Result<WebhookIngressRequest, IngressError> {
1361    let content_type = request
1362        .headers()
1363        .get(CONTENT_TYPE)
1364        .and_then(|value| value.to_str().ok())
1365        .unwrap_or("application/json")
1366        .to_string();
1367    let body = to_bytes(request.into_body(), MAX_INGRESS_BODY_BYTES)
1368        .await
1369        .map_err(|err| IngressError::Malformed(err.to_string()))?;
1370
1371    if content_type.starts_with("application/json") {
1372        return serde_json::from_slice(&body)
1373            .map_err(|err| IngressError::Malformed(err.to_string()));
1374    }
1375
1376    if content_type.starts_with("multipart/form-data") {
1377        return parse_multipart_webhook(&content_type, body).await;
1378    }
1379
1380    Err(IngressError::UnsupportedContentType)
1381}
1382
1383async fn parse_multipart_webhook(
1384    content_type: &str,
1385    body: axum::body::Bytes,
1386) -> Result<WebhookIngressRequest, IngressError> {
1387    let boundary = multer::parse_boundary(content_type)
1388        .map_err(|err| IngressError::Malformed(err.to_string()))?;
1389    let stream = stream::once(async move { Ok::<_, std::convert::Infallible>(body) });
1390    let mut multipart = multer::Multipart::new(stream, boundary);
1391
1392    let mut session_id = None::<String>;
1393    let mut payload = None::<Value>;
1394    let mut attachments = Vec::new();
1395    let mut granted_scopes = BTreeSet::new();
1396    let mut idempotency_key = None::<String>;
1397    let mut provider = None::<ProviderRequestConfig>;
1398    let mut policy_override = None::<EnginePolicy>;
1399
1400    while let Some(field) = multipart
1401        .next_field()
1402        .await
1403        .map_err(|err| IngressError::Malformed(err.to_string()))?
1404    {
1405        let name = field.name().unwrap_or_default().to_string();
1406        let file_name = field.file_name().map(str::to_string);
1407        let mime_type = field
1408            .content_type()
1409            .map(|value| value.to_string())
1410            .unwrap_or_else(|| "application/octet-stream".to_string());
1411
1412        if file_name.is_some() {
1413            attachments.push(MultimodalPayload {
1414                mime_type,
1415                file_name,
1416                data: field
1417                    .bytes()
1418                    .await
1419                    .map_err(|err| IngressError::Malformed(err.to_string()))?
1420                    .to_vec(),
1421            });
1422            continue;
1423        }
1424
1425        let text = field
1426            .text()
1427            .await
1428            .map_err(|err| IngressError::Malformed(err.to_string()))?;
1429        match name.as_str() {
1430            "session_id" => session_id = Some(text),
1431            "payload" => {
1432                payload = Some(parse_json_value(&text, "payload")?);
1433            }
1434            "granted_scope" => {
1435                if !text.trim().is_empty() {
1436                    granted_scopes.insert(text);
1437                }
1438            }
1439            "granted_scopes" => {
1440                let scopes = parse_json_value::<Vec<String>>(&text, "granted_scopes")?;
1441                granted_scopes.extend(scopes);
1442            }
1443            "idempotency_key" => {
1444                if !text.trim().is_empty() {
1445                    idempotency_key = Some(text);
1446                }
1447            }
1448            "provider" => provider = Some(parse_json_value(&text, "provider")?),
1449            "policy_override" => {
1450                policy_override = Some(parse_json_value(&text, "policy_override")?)
1451            }
1452            _ => {}
1453        }
1454    }
1455
1456    Ok(WebhookIngressRequest {
1457        session_id: session_id
1458            .ok_or_else(|| IngressError::Malformed("missing session_id".to_string()))?,
1459        payload: payload.unwrap_or(Value::Null),
1460        attachments,
1461        granted_scopes,
1462        idempotency_key,
1463        provider,
1464        policy_override,
1465    })
1466}
1467
1468fn parse_json_value<T: DeserializeOwned>(text: &str, field_name: &str) -> Result<T, IngressError> {
1469    serde_json::from_str(text)
1470        .map_err(|err| IngressError::Malformed(format!("invalid {field_name}: {err}")))
1471}
1472
1473async fn materialize_attachments(
1474    state: &RuntimeState,
1475    max_inline_attachment_bytes: usize,
1476    payloads: Vec<MultimodalPayload>,
1477) -> Result<Vec<AttachmentRef>, IngressError> {
1478    let mut attachments = Vec::with_capacity(payloads.len());
1479    for payload in payloads {
1480        let attachment_id = Uuid::new_v4().to_string();
1481        if payload.data.len() <= max_inline_attachment_bytes {
1482            attachments.push(AttachmentRef::inline(
1483                attachment_id,
1484                payload.mime_type,
1485                payload.file_name,
1486                payload.data,
1487            ));
1488        } else {
1489            attachments.push(
1490                state
1491                    .blob_store
1492                    .put(attachment_id, payload)
1493                    .await
1494                    .map_err(|err| IngressError::Blob(err.message))?,
1495            );
1496        }
1497    }
1498    Ok(attachments)
1499}
1500
1501async fn effective_policy(
1502    state: &RuntimeState,
1503    override_policy: Option<EnginePolicy>,
1504) -> EnginePolicy {
1505    if state.config.allow_policy_overrides
1506        && let Some(p) = override_policy
1507    {
1508        return p;
1509    }
1510    state.settings.engine_policy.read().await.clone()
1511}
1512
1513async fn effective_provider(
1514    state: &RuntimeState,
1515    override_provider: Option<ProviderRequestConfig>,
1516) -> ProviderRequestConfig {
1517    if state.config.allow_provider_overrides
1518        && let Some(p) = override_provider
1519    {
1520        return p;
1521    }
1522    state.settings.provider_config.read().await.clone()
1523}
1524
1525async fn run_process_request(
1526    state: &RuntimeState,
1527    mut request: ProcessRequest,
1528) -> Result<axum::response::Response, (StatusCode, String)> {
1529    // Default scopes for simple ingress if none provided
1530    if request.granted_scopes.is_empty() {
1531        request
1532            .granted_scopes
1533            .extend(state.default_scopes.iter().cloned());
1534    }
1535
1536    if state.config.async_ingress
1537        && let Some(ingress) = &state.ingress
1538    {
1539        let envelope = rain_engine_ingress::IngressEventEnvelope {
1540            session_id: request.session_id.clone(),
1541            trigger: request.trigger,
1542            granted_scopes: request.granted_scopes,
1543            idempotency_key: request.idempotency_key,
1544            policy: Some(request.policy),
1545            provider: Some(request.provider),
1546        };
1547        ingress
1548            .publish(&envelope)
1549            .await
1550            .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
1551
1552        let response_body = serde_json::json!({
1553            "session_id": request.session_id,
1554            "status": "accepted"
1555        });
1556        return Ok((StatusCode::ACCEPTED, axum::Json(response_body)).into_response());
1557    }
1558
1559    let timeout = Duration::from_millis(state.config.request_timeout_ms.max(1));
1560    match tokio::time::timeout(timeout, run_until_terminal_trace(&state.engine, request)).await {
1561        Ok(Ok(result)) => Ok(axum::Json(result).into_response()),
1562        Ok(Err(err)) => Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())),
1563        Err(_) => Err((StatusCode::REQUEST_TIMEOUT, "request timed out".to_string())),
1564    }
1565}
1566
1567pub async fn run_until_terminal_trace(
1568    engine: &AgentEngine,
1569    request: ProcessRequest,
1570) -> Result<RuntimeRunResult, EngineError> {
1571    let mut advances = Vec::new();
1572    let mut next = AdvanceRequest::Trigger(request.clone());
1573    loop {
1574        let result = engine.advance(next).await?;
1575        if let Some(outcome) = result.outcome.clone() {
1576            advances.push(result);
1577            return Ok(RuntimeRunResult { advances, outcome });
1578        }
1579        advances.push(result);
1580        next = AdvanceRequest::Continue(ContinueRequest {
1581            session_id: request.session_id.clone(),
1582            granted_scopes: request.granted_scopes.clone(),
1583            policy: request.policy.clone(),
1584            provider: request.provider.clone(),
1585            cancellation: request.cancellation.clone(),
1586        });
1587    }
1588}
1589
1590pub async fn run_until_terminal(
1591    engine: &AgentEngine,
1592    request: ProcessRequest,
1593) -> Result<EngineOutcome, EngineError> {
1594    Ok(run_until_terminal_trace(engine, request).await?.outcome)
1595}
1596
1597fn map_ingress_error(error: IngressError) -> (StatusCode, String) {
1598    match error {
1599        IngressError::UnsupportedContentType => {
1600            (StatusCode::UNSUPPORTED_MEDIA_TYPE, error.to_string())
1601        }
1602        IngressError::Malformed(_) => (StatusCode::BAD_REQUEST, error.to_string()),
1603        IngressError::Blob(_) => (StatusCode::INTERNAL_SERVER_ERROR, error.to_string()),
1604    }
1605}
1606
1607// ---------------------------------------------------------------------------
1608// Read handlers
1609// ---------------------------------------------------------------------------
1610
1611#[typeshare::typeshare]
1612#[derive(Debug, Clone, Serialize, Deserialize)]
1613pub struct HealthResponse {
1614    pub status: String,
1615    pub version: String,
1616}
1617
1618async fn handle_health() -> Json<HealthResponse> {
1619    Json(HealthResponse {
1620        status: "ok".to_string(),
1621        version: env!("CARGO_PKG_VERSION").to_string(),
1622    })
1623}
1624
1625async fn handle_capabilities(State(state): State<RuntimeState>) -> Json<RuntimeCapabilities> {
1626    Json(RuntimeCapabilities {
1627        version: env!("CARGO_PKG_VERSION").to_string(),
1628        provider_kind: state.provider_kind.clone(),
1629        default_model: state.config.default_provider.model.clone(),
1630        streaming: true,
1631        approvals: true,
1632        multipart_uploads: true,
1633        default_scopes: state.default_scopes.clone(),
1634        default_policy: state.config.default_policy.clone(),
1635        channels: state.channel_statuses.clone(),
1636        approval_metadata: ApprovalMetadataSupport {
1637            structured_json: true,
1638            recommended_fields: vec![
1639                "actor_id".to_string(),
1640                "client".to_string(),
1641                "reason".to_string(),
1642                "decided_at_ms".to_string(),
1643            ],
1644        },
1645        wake_support: true,
1646        delegation_support: true,
1647        learning_support: true,
1648        upload_limits: UploadSupport {
1649            multipart: true,
1650            max_request_bytes: MAX_INGRESS_BODY_BYTES,
1651        },
1652        skills: state.engine.skill_definitions().await,
1653    })
1654}
1655
1656async fn handle_get_settings(State(state): State<RuntimeState>) -> Json<RuntimeSettingsView> {
1657    Json(state.settings.to_view().await)
1658}
1659
1660async fn handle_update_settings(
1661    State(state): State<RuntimeState>,
1662    Json(request): Json<RuntimeSettingsUpdateRequest>,
1663) -> Json<RuntimeSettingsView> {
1664    state.settings.apply(request).await;
1665    Json(state.settings.to_view().await)
1666}
1667
1668#[typeshare::typeshare]
1669#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1670pub struct SessionListParams {
1671    #[serde(default)]
1672    pub offset: Option<usize>,
1673    #[serde(default)]
1674    pub limit: Option<usize>,
1675    #[serde(default)]
1676    pub since_ms: Option<i64>,
1677    #[serde(default)]
1678    pub until_ms: Option<i64>,
1679}
1680
1681async fn handle_list_sessions(
1682    State(state): State<RuntimeState>,
1683    Query(params): Query<SessionListParams>,
1684) -> Result<Json<Value>, (StatusCode, String)> {
1685    let query = SessionListQuery {
1686        offset: params.offset.unwrap_or(0),
1687        limit: params.limit.unwrap_or(100),
1688        since_ms: params.since_ms,
1689        until_ms: params.until_ms,
1690    };
1691    let sessions = state
1692        .memory
1693        .list_sessions(query)
1694        .await
1695        .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.message))?;
1696    Ok(Json(serde_json::to_value(sessions).unwrap_or_default()))
1697}
1698
1699async fn handle_list_session_views(
1700    State(state): State<RuntimeState>,
1701    Query(params): Query<SessionListParams>,
1702) -> Result<Json<Vec<SessionListItemView>>, (StatusCode, String)> {
1703    let query = SessionListQuery {
1704        offset: params.offset.unwrap_or(0),
1705        limit: params.limit.unwrap_or(100),
1706        since_ms: params.since_ms,
1707        until_ms: params.until_ms,
1708    };
1709    let sessions = state
1710        .memory
1711        .list_sessions(query)
1712        .await
1713        .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.message))?;
1714
1715    let mut views = Vec::with_capacity(sessions.len());
1716    for session in sessions {
1717        let snapshot = if let Ok(Some(cached)) = state
1718            .engine
1719            .state_cache()
1720            .get_projection(&session.session_id)
1721            .await
1722        {
1723            cached
1724        } else {
1725            state
1726                .memory
1727                .load_session(&session.session_id)
1728                .await
1729                .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.message))?
1730        };
1731        views.push(build_session_list_item(snapshot));
1732    }
1733
1734    views.sort_by(|left, right| right.last_activity_at_ms.cmp(&left.last_activity_at_ms));
1735    Ok(Json(views))
1736}
1737
1738async fn handle_get_session(
1739    State(state): State<RuntimeState>,
1740    Path(session_id): Path<String>,
1741) -> Result<Json<Value>, (StatusCode, String)> {
1742    let snapshot =
1743        if let Ok(Some(cached)) = state.engine.state_cache().get_projection(&session_id).await {
1744            cached
1745        } else {
1746            state
1747                .memory
1748                .load_session(&session_id)
1749                .await
1750                .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.message))?
1751        };
1752    Ok(Json(serde_json::to_value(snapshot).unwrap_or_default()))
1753}
1754
1755async fn handle_get_session_view(
1756    State(state): State<RuntimeState>,
1757    Path(session_id): Path<String>,
1758) -> Result<Json<SessionView>, (StatusCode, String)> {
1759    let snapshot =
1760        if let Ok(Some(cached)) = state.engine.state_cache().get_projection(&session_id).await {
1761            cached
1762        } else {
1763            state
1764                .memory
1765                .load_session(&session_id)
1766                .await
1767                .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.message))?
1768        };
1769    Ok(Json(build_session_view(snapshot)))
1770}
1771
1772async fn handle_get_execution_graph(
1773    State(state): State<RuntimeState>,
1774    Path(session_id): Path<String>,
1775) -> Result<Json<ExecutionGraphView>, (StatusCode, String)> {
1776    let snapshot =
1777        if let Ok(Some(cached)) = state.engine.state_cache().get_projection(&session_id).await {
1778            cached
1779        } else {
1780            state
1781                .memory
1782                .load_session(&session_id)
1783                .await
1784                .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.message))?
1785        };
1786    Ok(Json(build_execution_graph_view(&snapshot)))
1787}
1788
1789#[typeshare::typeshare]
1790#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1791pub struct RecordListParams {
1792    #[serde(default)]
1793    pub offset: Option<usize>,
1794    #[serde(default)]
1795    pub limit: Option<usize>,
1796    #[serde(default)]
1797    pub since_ms: Option<i64>,
1798    #[serde(default)]
1799    pub until_ms: Option<i64>,
1800}
1801
1802async fn handle_install_skill(
1803    State(state): State<RuntimeState>,
1804    Json(request): Json<InstallSkillRequest>,
1805) -> Result<StatusCode, (StatusCode, String)> {
1806    let direct_install_enabled = std::env::var("RAIN_ENABLE_DIRECT_SKILL_INSTALL")
1807        .map(|value| value == "true" || value == "1")
1808        .unwrap_or(false);
1809    if !direct_install_enabled {
1810        return Err((
1811            StatusCode::FORBIDDEN,
1812            "direct skill install is disabled; use operator tooling or set RAIN_ENABLE_DIRECT_SKILL_INSTALL=true"
1813                .to_string(),
1814        ));
1815    }
1816
1817    info!("Attempting to install skill: {}", request.manifest.name);
1818
1819    let wasm_bytes = if let Some(url) = &request.wasm_url {
1820        let client = reqwest::Client::new();
1821        client
1822            .get(url)
1823            .send()
1824            .await
1825            .map_err(|err| (StatusCode::BAD_GATEWAY, format!("Download failed: {}", err)))?
1826            .bytes()
1827            .await
1828            .map_err(|err| (StatusCode::BAD_GATEWAY, format!("Read failed: {}", err)))?
1829            .to_vec()
1830    } else if let Some(b64) = &request.wasm_base64 {
1831        use base64::{Engine as _, engine::general_purpose};
1832        general_purpose::STANDARD.decode(b64).map_err(|err| {
1833            (
1834                StatusCode::BAD_REQUEST,
1835                format!("Base64 decode failed: {}", err),
1836            )
1837        })?
1838    } else if let Some(path) = &request.file_path {
1839        tokio::fs::read(path).await.map_err(|err| {
1840            (
1841                StatusCode::BAD_REQUEST,
1842                format!("File read failed: {}", err),
1843            )
1844        })?
1845    } else {
1846        return Err((
1847            StatusCode::BAD_REQUEST,
1848            "Must provide wasm_url, wasm_base64, or file_path".to_string(),
1849        ));
1850    };
1851
1852    let config = rain_engine_wasm::WasmSkillConfig {
1853        manifest: request.manifest.clone(),
1854        wasm_bytes: Arc::new(wasm_bytes.clone()),
1855        capabilities: Arc::new(rain_engine_wasm::InMemoryCapabilityHost::new().with_http_client()),
1856    };
1857
1858    let executor = rain_engine_wasm::WasmSkillExecutor::new(config).map_err(|err| {
1859        (
1860            StatusCode::INTERNAL_SERVER_ERROR,
1861            format!("Init failed: {}", err),
1862        )
1863    })?;
1864    let executor = Arc::new(executor);
1865    state
1866        .engine
1867        .register_wasm_skill_persistent(request.manifest, executor.clone(), wasm_bytes)
1868        .await
1869        .map_err(|err| {
1870            (
1871                StatusCode::INTERNAL_SERVER_ERROR,
1872                format!("Storage failed: {err}"),
1873            )
1874        })?;
1875
1876    info!(
1877        "Skill successfully registered: {}",
1878        executor.manifest().name
1879    );
1880    Ok(StatusCode::CREATED)
1881}
1882
1883async fn handle_list_records(
1884    State(state): State<RuntimeState>,
1885    Path(session_id): Path<String>,
1886    Query(params): Query<RecordListParams>,
1887) -> Result<Json<Value>, (StatusCode, String)> {
1888    let query = RecordPageQuery {
1889        session_id,
1890        offset: params.offset.unwrap_or(0),
1891        limit: params.limit.unwrap_or(100),
1892        since_ms: params.since_ms,
1893        until_ms: params.until_ms,
1894    };
1895    let page = state
1896        .memory
1897        .list_records(query)
1898        .await
1899        .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.message))?;
1900    Ok(Json(serde_json::to_value(page).unwrap_or_default()))
1901}
1902
1903fn build_session_view(snapshot: SessionSnapshot) -> SessionView {
1904    let state = snapshot.agent_state();
1905    let pending_approval = latest_pending_approval(&snapshot);
1906    let timeline = build_timeline(&snapshot.records);
1907    let tool_timeline = build_tool_timeline(&snapshot.records);
1908    let status = derive_session_status(&snapshot, pending_approval.as_ref());
1909    let activity_state = derive_activity_state(&snapshot, pending_approval.as_ref());
1910    let total_estimated_cost_usd = snapshot.total_estimated_cost_usd();
1911    let self_improvement = build_self_improvement_view(&snapshot);
1912    let execution_graph = build_execution_graph_view(&snapshot);
1913    let current_task = current_task(&state);
1914    let active_channel_ids = derive_active_channel_ids(&snapshot.records);
1915    let pending_wake = state.pending_wake.as_ref().map(pending_wake_view);
1916    let current_focus = derive_current_focus(
1917        &snapshot,
1918        pending_approval.as_ref(),
1919        activity_state.clone(),
1920        current_task.as_ref(),
1921        pending_wake.as_ref(),
1922    );
1923    let blocked_reason = derive_blocked_reason(
1924        pending_approval.as_ref(),
1925        current_task.as_ref(),
1926        pending_wake.as_ref(),
1927    );
1928
1929    SessionView {
1930        session_id: snapshot.session_id,
1931        status,
1932        activity_state,
1933        current_focus,
1934        current_task_id: current_task.as_ref().map(|task| task.task_id.0.clone()),
1935        current_task_title: current_task.as_ref().map(|task| task.title.clone()),
1936        next_wake_at_ms: pending_wake.as_ref().and_then(|wake| wake.due_at_ms),
1937        blocked_reason,
1938        last_human_input_at_ms: last_human_input_at_ms(&snapshot.records),
1939        last_assistant_activity_at_ms: last_assistant_activity_at_ms(&snapshot.records),
1940        active_channel_ids,
1941        pending_wake,
1942        wake_history: build_wake_history(&snapshot.records),
1943        last_heartbeat: build_last_heartbeat(&snapshot.records),
1944        last_sequence_no: snapshot.last_sequence_no,
1945        latest_outcome: snapshot.latest_outcome,
1946        pending_approval,
1947        state,
1948        timeline,
1949        tool_timeline,
1950        self_improvement,
1951        execution_graph,
1952        record_count: snapshot.records.len(),
1953        total_estimated_cost_usd,
1954    }
1955}
1956
1957fn build_session_list_item(snapshot: SessionSnapshot) -> SessionListItemView {
1958    let state = snapshot.agent_state();
1959    let pending_approval = latest_pending_approval(&snapshot);
1960    let status = derive_session_status(&snapshot, pending_approval.as_ref());
1961    let activity_state = derive_activity_state(&snapshot, pending_approval.as_ref());
1962    let pending_wake = state.pending_wake.as_ref().map(pending_wake_view);
1963    let current_task = current_task(&state);
1964    let current_focus = derive_current_focus(
1965        &snapshot,
1966        pending_approval.as_ref(),
1967        activity_state.clone(),
1968        current_task.as_ref(),
1969        pending_wake.as_ref(),
1970    );
1971
1972    SessionListItemView {
1973        session_id: snapshot.session_id.clone(),
1974        status,
1975        activity_state: activity_state.clone(),
1976        current_focus,
1977        latest_provider: latest_provider_name(&snapshot.records),
1978        last_activity_at_ms: last_activity_at_ms(&snapshot.records),
1979        pending_approval: pending_approval.is_some(),
1980        pending_wake: pending_wake.is_some(),
1981        unread_event_count: derive_unread_event_count(&snapshot.records),
1982        active_channel_ids: derive_active_channel_ids(&snapshot.records),
1983        record_count: snapshot.records.len(),
1984    }
1985}
1986
1987fn build_execution_graph_view(snapshot: &SessionSnapshot) -> ExecutionGraphView {
1988    let active_graph = snapshot.active_tool_execution_graph();
1989    let graphs = snapshot.tool_execution_graphs();
1990    let checkpoints = snapshot.tool_node_checkpoints();
1991    let validations = snapshot.skill_input_validations();
1992    let mut latest = BTreeMap::<String, rain_engine_core::ToolNodeStatus>::new();
1993    for checkpoint in checkpoints.iter().filter(|checkpoint| {
1994        active_graph
1995            .as_ref()
1996            .map(|graph| graph.graph_id == checkpoint.graph_id)
1997            .unwrap_or(false)
1998    }) {
1999        latest.insert(checkpoint.call_id.clone(), checkpoint.status.clone());
2000    }
2001    let blocked_call_ids = active_graph
2002        .as_ref()
2003        .map(|graph| {
2004            graph
2005                .nodes
2006                .iter()
2007                .filter(|node| {
2008                    node.dependencies.iter().any(|dependency| {
2009                        matches!(
2010                            latest.get(&dependency.call_id),
2011                            Some(rain_engine_core::ToolNodeStatus::Failed)
2012                                | Some(rain_engine_core::ToolNodeStatus::Skipped)
2013                                | Some(rain_engine_core::ToolNodeStatus::TimedOut)
2014                        )
2015                    })
2016                })
2017                .map(|node| node.call_id.clone())
2018                .collect()
2019        })
2020        .unwrap_or_default();
2021
2022    ExecutionGraphView {
2023        active_graph,
2024        graphs,
2025        checkpoints,
2026        validations,
2027        blocked_call_ids,
2028    }
2029}
2030
2031fn build_self_improvement_view(snapshot: &SessionSnapshot) -> SelfImprovementView {
2032    SelfImprovementView {
2033        active_overlay: snapshot.active_policy_overlay(),
2034        reflections: snapshot.reflections(),
2035        policy_tunings: snapshot.policy_tunings(),
2036        strategy_preferences: snapshot.strategy_preferences(),
2037        tool_performance: snapshot.tool_performance_records(),
2038        profile_patches: snapshot
2039            .records
2040            .iter()
2041            .filter_map(|record| match record {
2042                SessionRecord::ProfilePatch(patch) => Some(patch.clone()),
2043                _ => None,
2044            })
2045            .collect(),
2046    }
2047}
2048
2049fn derive_session_status(
2050    snapshot: &SessionSnapshot,
2051    pending_approval: Option<&ApprovalView>,
2052) -> SessionStatus {
2053    if snapshot.records.is_empty() {
2054        return SessionStatus::Empty;
2055    }
2056
2057    if pending_approval.is_some() {
2058        return SessionStatus::Suspended;
2059    }
2060
2061    match snapshot
2062        .latest_outcome
2063        .as_ref()
2064        .map(|outcome| &outcome.stop_reason)
2065    {
2066        None => SessionStatus::Running,
2067        Some(StopReason::Responded | StopReason::Yielded) => SessionStatus::Completed,
2068        Some(StopReason::Suspended) => SessionStatus::Suspended,
2069        Some(StopReason::Delegated) => SessionStatus::Delegated,
2070        Some(
2071            StopReason::ProviderFailure
2072            | StopReason::StorageFailure
2073            | StopReason::PolicyAborted
2074            | StopReason::DeadlineExceeded
2075            | StopReason::Cancelled,
2076        ) => SessionStatus::Failed,
2077        Some(StopReason::MaxStepsReached) => SessionStatus::Stopped,
2078    }
2079}
2080
2081fn derive_activity_state(
2082    snapshot: &SessionSnapshot,
2083    pending_approval: Option<&ApprovalView>,
2084) -> SessionActivityState {
2085    if pending_approval.is_some() {
2086        return SessionActivityState::WaitingHuman;
2087    }
2088
2089    if snapshot.active_tool_execution_graph().is_some() {
2090        return SessionActivityState::RunningTools;
2091    }
2092
2093    if snapshot
2094        .records
2095        .iter()
2096        .rev()
2097        .find_map(|record| match record {
2098            SessionRecord::Deliberation(_) => Some(SessionActivityState::Reasoning),
2099            SessionRecord::Delegation(_) => Some(SessionActivityState::Delegated),
2100            SessionRecord::KernelEvent(event) => match &event.event {
2101                rain_engine_core::KernelEvent::TaskBlocked { .. } => {
2102                    Some(SessionActivityState::WaitingExternal)
2103                }
2104                rain_engine_core::KernelEvent::WakeRequested(_)
2105                | rain_engine_core::KernelEvent::WakeScheduled(_) => {
2106                    Some(SessionActivityState::Scheduled)
2107                }
2108                _ => None,
2109            },
2110            SessionRecord::Outcome(outcome) => match outcome.stop_reason {
2111                StopReason::Delegated => Some(SessionActivityState::Delegated),
2112                StopReason::ProviderFailure
2113                | StopReason::StorageFailure
2114                | StopReason::PolicyAborted
2115                | StopReason::DeadlineExceeded
2116                | StopReason::Cancelled => Some(SessionActivityState::Errored),
2117                _ => None,
2118            },
2119            _ => None,
2120        })
2121        .is_some()
2122    {
2123        return snapshot
2124            .records
2125            .iter()
2126            .rev()
2127            .find_map(|record| match record {
2128                SessionRecord::Deliberation(_) => Some(SessionActivityState::Reasoning),
2129                SessionRecord::Delegation(_) => Some(SessionActivityState::Delegated),
2130                SessionRecord::KernelEvent(event) => match &event.event {
2131                    rain_engine_core::KernelEvent::TaskBlocked { .. } => {
2132                        Some(SessionActivityState::WaitingExternal)
2133                    }
2134                    rain_engine_core::KernelEvent::WakeRequested(_)
2135                    | rain_engine_core::KernelEvent::WakeScheduled(_) => {
2136                        Some(SessionActivityState::Scheduled)
2137                    }
2138                    _ => None,
2139                },
2140                SessionRecord::Outcome(outcome) => match outcome.stop_reason {
2141                    StopReason::Delegated => Some(SessionActivityState::Delegated),
2142                    StopReason::ProviderFailure
2143                    | StopReason::StorageFailure
2144                    | StopReason::PolicyAborted
2145                    | StopReason::DeadlineExceeded
2146                    | StopReason::Cancelled => Some(SessionActivityState::Errored),
2147                    _ => None,
2148                },
2149                _ => None,
2150            })
2151            .unwrap_or(SessionActivityState::Idle);
2152    }
2153
2154    match snapshot
2155        .latest_outcome
2156        .as_ref()
2157        .map(|outcome| &outcome.stop_reason)
2158    {
2159        Some(StopReason::Delegated) => SessionActivityState::Delegated,
2160        Some(
2161            StopReason::ProviderFailure
2162            | StopReason::StorageFailure
2163            | StopReason::PolicyAborted
2164            | StopReason::DeadlineExceeded
2165            | StopReason::Cancelled,
2166        ) => SessionActivityState::Errored,
2167        _ if snapshot.agent_state().pending_wake.is_some() => SessionActivityState::Scheduled,
2168        _ => SessionActivityState::Idle,
2169    }
2170}
2171
2172fn current_task(state: &AgentStateSnapshot) -> Option<rain_engine_core::TaskRecord> {
2173    let priority_order = |status: &rain_engine_core::TaskStatus| match status {
2174        rain_engine_core::TaskStatus::Running => 0,
2175        rain_engine_core::TaskStatus::Ready => 1,
2176        rain_engine_core::TaskStatus::WaitingHuman => 2,
2177        rain_engine_core::TaskStatus::Blocked => 3,
2178        rain_engine_core::TaskStatus::Pending => 4,
2179        rain_engine_core::TaskStatus::Failed => 5,
2180        rain_engine_core::TaskStatus::Done => 6,
2181        rain_engine_core::TaskStatus::Abandoned => 7,
2182    };
2183
2184    state
2185        .tasks
2186        .iter()
2187        .filter(|task| {
2188            !matches!(
2189                task.status,
2190                rain_engine_core::TaskStatus::Done | rain_engine_core::TaskStatus::Abandoned
2191            )
2192        })
2193        .min_by_key(|task| (priority_order(&task.status), unix_time_ms(task.created_at)))
2194        .cloned()
2195}
2196
2197fn derive_current_focus(
2198    snapshot: &SessionSnapshot,
2199    pending_approval: Option<&ApprovalView>,
2200    activity_state: SessionActivityState,
2201    current_task: Option<&rain_engine_core::TaskRecord>,
2202    pending_wake: Option<&WakeView>,
2203) -> Option<String> {
2204    if let Some(approval) = pending_approval {
2205        return Some(approval.reason.clone());
2206    }
2207
2208    if let Some(task) = current_task {
2209        return Some(match task.status {
2210            rain_engine_core::TaskStatus::Running => format!("Working on {}", task.title),
2211            rain_engine_core::TaskStatus::Ready => format!("Ready to start {}", task.title),
2212            rain_engine_core::TaskStatus::WaitingHuman => {
2213                format!("Awaiting approval for {}", task.title)
2214            }
2215            rain_engine_core::TaskStatus::Blocked => format!("Blocked on {}", task.title),
2216            rain_engine_core::TaskStatus::Pending => format!("Queued {}", task.title),
2217            rain_engine_core::TaskStatus::Failed => format!("Recovering {}", task.title),
2218            rain_engine_core::TaskStatus::Done | rain_engine_core::TaskStatus::Abandoned => {
2219                task.title.clone()
2220            }
2221        });
2222    }
2223
2224    if let Some(wake) = pending_wake {
2225        return Some(format!("Scheduled wake: {}", wake.reason));
2226    }
2227
2228    if let Some(deliberation) = snapshot
2229        .records
2230        .iter()
2231        .rev()
2232        .find_map(|record| match record {
2233            SessionRecord::Deliberation(deliberation) => Some(deliberation.summary.clone()),
2234            _ => None,
2235        })
2236    {
2237        return Some(deliberation);
2238    }
2239
2240    if let Some(outcome) = snapshot.latest_outcome.as_ref() {
2241        return outcome
2242            .response
2243            .clone()
2244            .or_else(|| outcome.detail.clone())
2245            .map(|text| truncate_preview(&text));
2246    }
2247
2248    Some(match activity_state {
2249        SessionActivityState::RunningTools => "Executing tools".to_string(),
2250        SessionActivityState::Reasoning => "Evaluating next action".to_string(),
2251        SessionActivityState::WaitingExternal => "Waiting on external state".to_string(),
2252        SessionActivityState::Scheduled => "Waiting for scheduled wake".to_string(),
2253        SessionActivityState::Delegated => "Waiting on delegated work".to_string(),
2254        SessionActivityState::Errored => "Investigating a failed step".to_string(),
2255        SessionActivityState::WaitingHuman => "Waiting on a human decision".to_string(),
2256        SessionActivityState::Idle => "Idle until the next event".to_string(),
2257    })
2258}
2259
2260fn derive_blocked_reason(
2261    pending_approval: Option<&ApprovalView>,
2262    current_task: Option<&rain_engine_core::TaskRecord>,
2263    pending_wake: Option<&WakeView>,
2264) -> Option<String> {
2265    if let Some(approval) = pending_approval {
2266        return Some(approval.reason.clone());
2267    }
2268    if let Some(task) = current_task
2269        && matches!(
2270            task.status,
2271            rain_engine_core::TaskStatus::Blocked | rain_engine_core::TaskStatus::WaitingHuman
2272        )
2273    {
2274        return task.detail.clone().or_else(|| {
2275            if task.blocked_by.is_empty() {
2276                None
2277            } else {
2278                Some(format!("blocked by {}", task.blocked_by.len()))
2279            }
2280        });
2281    }
2282    pending_wake.map(|wake| wake.reason.clone())
2283}
2284
2285fn derive_active_channel_ids(records: &[SessionRecord]) -> Vec<String> {
2286    let mut channels = BTreeSet::new();
2287    for record in records {
2288        if let SessionRecord::Trigger(trigger) = record {
2289            match &trigger.trigger {
2290                AgentTrigger::HumanInput { actor_id, .. }
2291                | AgentTrigger::Message {
2292                    user_id: actor_id, ..
2293                } => {
2294                    if let Some((channel, _)) = actor_id.split_once(':') {
2295                        channels.insert(channel.to_string());
2296                    }
2297                }
2298                _ => {}
2299            }
2300        }
2301    }
2302    channels.into_iter().collect()
2303}
2304
2305fn last_human_input_at_ms(records: &[SessionRecord]) -> Option<i64> {
2306    records.iter().rev().find_map(|record| match record {
2307        SessionRecord::Trigger(trigger)
2308            if matches!(
2309                trigger.trigger,
2310                AgentTrigger::HumanInput { .. } | AgentTrigger::Message { .. }
2311            ) =>
2312        {
2313            Some(unix_time_ms(trigger.recorded_at))
2314        }
2315        _ => None,
2316    })
2317}
2318
2319fn last_assistant_activity_at_ms(records: &[SessionRecord]) -> Option<i64> {
2320    records.iter().rev().find_map(|record| match record {
2321        SessionRecord::Outcome(outcome) => Some(unix_time_ms(outcome.finished_at)),
2322        SessionRecord::ToolResult(result) => Some(unix_time_ms(result.finished_at)),
2323        SessionRecord::Deliberation(deliberation) => Some(unix_time_ms(deliberation.created_at)),
2324        _ => None,
2325    })
2326}
2327
2328fn last_activity_at_ms(records: &[SessionRecord]) -> i64 {
2329    records
2330        .iter()
2331        .rev()
2332        .find_map(record_occurred_at_ms)
2333        .unwrap_or_default()
2334}
2335
2336fn record_occurred_at_ms(record: &SessionRecord) -> Option<i64> {
2337    match record {
2338        SessionRecord::Trigger(trigger) => Some(unix_time_ms(trigger.recorded_at)),
2339        SessionRecord::TriggerIntent(intent) => Some(unix_time_ms(intent.classified_at)),
2340        SessionRecord::KernelEvent(event) => Some(unix_time_ms(event.occurred_at)),
2341        SessionRecord::ModelDecision(decision) => Some(unix_time_ms(decision.decided_at)),
2342        SessionRecord::Deliberation(deliberation) => Some(unix_time_ms(deliberation.created_at)),
2343        SessionRecord::ToolExecutionGraph(graph) => Some(unix_time_ms(graph.created_at)),
2344        SessionRecord::ToolNodeCheckpoint(checkpoint) => Some(unix_time_ms(checkpoint.occurred_at)),
2345        SessionRecord::SkillInputValidation(validation) => {
2346            Some(unix_time_ms(validation.validated_at))
2347        }
2348        SessionRecord::ToolCall(call) => Some(unix_time_ms(call.called_at)),
2349        SessionRecord::ToolResult(result) => Some(unix_time_ms(result.finished_at)),
2350        SessionRecord::PendingApproval(approval) => Some(unix_time_ms(approval.created_at)),
2351        SessionRecord::ApprovalResolution(resolution) => Some(unix_time_ms(resolution.resolved_at)),
2352        SessionRecord::Delegation(delegation) => Some(unix_time_ms(delegation.created_at)),
2353        SessionRecord::CoordinationClaim(claim) => Some(unix_time_ms(claim.claimed_at)),
2354        SessionRecord::ProviderUsage(usage) => Some(unix_time_ms(usage.recorded_at)),
2355        SessionRecord::ProviderCache(cache) => Some(unix_time_ms(cache.cached_at)),
2356        SessionRecord::Reflection(reflection) => Some(unix_time_ms(reflection.created_at)),
2357        SessionRecord::PolicyTuning(tuning) => Some(unix_time_ms(tuning.created_at)),
2358        SessionRecord::StrategyPreference(preference) => Some(unix_time_ms(preference.created_at)),
2359        SessionRecord::ToolPerformance(perf) => Some(unix_time_ms(perf.created_at)),
2360        SessionRecord::ProfilePatch(patch) => Some(unix_time_ms(patch.created_at)),
2361        SessionRecord::ExecutionPlan(plan) => Some(unix_time_ms(plan.created_at)),
2362        SessionRecord::Summary(summary) => Some(unix_time_ms(summary.created_at)),
2363        SessionRecord::Outcome(outcome) => Some(unix_time_ms(outcome.finished_at)),
2364    }
2365}
2366
2367fn latest_provider_name(records: &[SessionRecord]) -> Option<String> {
2368    records.iter().rev().find_map(|record| match record {
2369        SessionRecord::ProviderUsage(usage) => Some(usage.provider_name.clone()),
2370        _ => None,
2371    })
2372}
2373
2374fn derive_unread_event_count(records: &[SessionRecord]) -> usize {
2375    let last_human_at = last_human_input_at_ms(records).unwrap_or_default();
2376    records
2377        .iter()
2378        .filter(|record| {
2379            record_occurred_at_ms(record).is_some_and(|at| {
2380                at >= last_human_at
2381                    && !matches!(
2382                        record,
2383                        SessionRecord::Trigger(trigger)
2384                            if matches!(
2385                                trigger.trigger,
2386                                AgentTrigger::HumanInput { .. } | AgentTrigger::Message { .. }
2387                            )
2388                    )
2389            })
2390        })
2391        .count()
2392}
2393
2394fn pending_wake_view(wake: &rain_engine_core::WakeRequestRecord) -> WakeView {
2395    WakeView {
2396        wake_id: wake.wake_id.0.clone(),
2397        reason: wake.reason.clone(),
2398        status: "scheduled".to_string(),
2399        occurred_at_ms: unix_time_ms(wake.requested_at),
2400        due_at_ms: Some(unix_time_ms(wake.due_at)),
2401        task_id: wake.task_id.as_ref().map(|task_id| task_id.0.clone()),
2402    }
2403}
2404
2405fn build_wake_history(records: &[SessionRecord]) -> Vec<WakeView> {
2406    let mut wakes = Vec::new();
2407    for record in records.iter().rev() {
2408        match record {
2409            SessionRecord::KernelEvent(event) => match &event.event {
2410                rain_engine_core::KernelEvent::WakeRequested(wake)
2411                | rain_engine_core::KernelEvent::WakeScheduled(wake) => wakes.push(WakeView {
2412                    wake_id: wake.wake_id.0.clone(),
2413                    reason: wake.reason.clone(),
2414                    status: "scheduled".to_string(),
2415                    occurred_at_ms: unix_time_ms(event.occurred_at),
2416                    due_at_ms: Some(unix_time_ms(wake.due_at)),
2417                    task_id: wake.task_id.as_ref().map(|task_id| task_id.0.clone()),
2418                }),
2419                rain_engine_core::KernelEvent::WakeCompleted {
2420                    wake_id, reason, ..
2421                } => {
2422                    wakes.push(WakeView {
2423                        wake_id: wake_id.0.clone(),
2424                        reason: reason.clone(),
2425                        status: "completed".to_string(),
2426                        occurred_at_ms: unix_time_ms(event.occurred_at),
2427                        due_at_ms: None,
2428                        task_id: None,
2429                    });
2430                }
2431                _ => {}
2432            },
2433            SessionRecord::Trigger(trigger)
2434                if matches!(trigger.trigger, AgentTrigger::ScheduledWake { .. }) =>
2435            {
2436                if let AgentTrigger::ScheduledWake {
2437                    wake_id,
2438                    due_at,
2439                    reason,
2440                } = &trigger.trigger
2441                {
2442                    wakes.push(WakeView {
2443                        wake_id: wake_id.0.clone(),
2444                        reason: reason.clone(),
2445                        status: "fired".to_string(),
2446                        occurred_at_ms: unix_time_ms(trigger.recorded_at),
2447                        due_at_ms: Some(unix_time_ms(*due_at)),
2448                        task_id: None,
2449                    });
2450                }
2451            }
2452            _ => {}
2453        }
2454        if wakes.len() >= 12 {
2455            break;
2456        }
2457    }
2458    wakes
2459}
2460
2461fn build_last_heartbeat(records: &[SessionRecord]) -> Option<HeartbeatStatusView> {
2462    let mut latest_wake: Option<(String, String, i64)> = None;
2463    for record in records.iter().rev() {
2464        if let SessionRecord::Trigger(trigger) = record
2465            && let AgentTrigger::ScheduledWake {
2466                wake_id, reason, ..
2467            } = &trigger.trigger
2468            && reason.to_ascii_lowercase().contains("heartbeat")
2469        {
2470            latest_wake = Some((
2471                wake_id.0.clone(),
2472                reason.clone(),
2473                unix_time_ms(trigger.recorded_at),
2474            ));
2475            break;
2476        }
2477    }
2478
2479    let (wake_id, reason, occurred_at_ms) = latest_wake?;
2480    let outcome = records.iter().rev().find_map(|record| match record {
2481        SessionRecord::Outcome(outcome) if unix_time_ms(outcome.finished_at) >= occurred_at_ms => {
2482            Some(outcome)
2483        }
2484        _ => None,
2485    });
2486
2487    Some(HeartbeatStatusView {
2488        wake_id,
2489        reason,
2490        occurred_at_ms,
2491        outcome_summary: outcome
2492            .and_then(|outcome| outcome.response.clone().or_else(|| outcome.detail.clone()))
2493            .map(|text| truncate_preview(&text)),
2494        stop_reason: outcome.map(|outcome| outcome.stop_reason.clone()),
2495    })
2496}
2497
2498fn latest_pending_approval(snapshot: &SessionSnapshot) -> Option<ApprovalView> {
2499    let resolved = snapshot
2500        .records
2501        .iter()
2502        .filter_map(|record| match record {
2503            SessionRecord::ApprovalResolution(resolution) => {
2504                Some(resolution.resume_token.0.clone())
2505            }
2506            _ => None,
2507        })
2508        .collect::<BTreeSet<_>>();
2509
2510    let target_token = snapshot
2511        .latest_outcome
2512        .as_ref()
2513        .filter(|outcome| outcome.stop_reason == StopReason::Suspended)
2514        .and_then(|outcome| outcome.resume_token.as_ref())
2515        .map(|token| token.0.as_str());
2516
2517    snapshot
2518        .records
2519        .iter()
2520        .rev()
2521        .find_map(|record| match record {
2522            SessionRecord::PendingApproval(approval)
2523                if !resolved.contains(&approval.resume_token.0)
2524                    && target_token
2525                        .map(|token| token == approval.resume_token.0.as_str())
2526                        .unwrap_or(true) =>
2527            {
2528                Some(approval_view(approval))
2529            }
2530            _ => None,
2531        })
2532}
2533
2534fn approval_view(record: &PendingApprovalRecord) -> ApprovalView {
2535    ApprovalView {
2536        resume_token: record.resume_token.0.clone(),
2537        created_at_ms: unix_time_ms(record.created_at),
2538        trigger_id: record.trigger_id.clone(),
2539        step: record.step,
2540        reason: match &record.reason {
2541            rain_engine_core::SuspendReason::HumanApprovalRequired { skill_names } => {
2542                format!("Human approval required for {}", skill_names.join(", "))
2543            }
2544            rain_engine_core::SuspendReason::ProviderRequested { message } => message.clone(),
2545        },
2546        pending_calls: record.pending_calls.clone(),
2547    }
2548}
2549
2550fn build_timeline(records: &[SessionRecord]) -> Vec<TimelineItem> {
2551    records
2552        .iter()
2553        .filter_map(|record| match record {
2554            SessionRecord::Trigger(trigger) => match &trigger.trigger {
2555                AgentTrigger::HumanInput {
2556                    actor_id, content, ..
2557                } => Some(TimelineItem::HumanInput {
2558                    actor_id: actor_id.clone(),
2559                    content: content.clone(),
2560                    occurred_at_ms: unix_time_ms(trigger.recorded_at),
2561                }),
2562                AgentTrigger::ExternalEvent { source, .. }
2563                | AgentTrigger::Webhook { source, .. }
2564                | AgentTrigger::SystemObservation { source, .. } => Some(TimelineItem::System {
2565                    label: "event received".to_string(),
2566                    detail: source.clone(),
2567                    occurred_at_ms: unix_time_ms(trigger.recorded_at),
2568                }),
2569                AgentTrigger::ScheduledWake { reason, .. } => Some(TimelineItem::System {
2570                    label: "scheduled wake".to_string(),
2571                    detail: reason.clone(),
2572                    occurred_at_ms: unix_time_ms(trigger.recorded_at),
2573                }),
2574                AgentTrigger::Approval { decision, .. } => Some(TimelineItem::System {
2575                    label: "approval submitted".to_string(),
2576                    detail: format!("{decision:?}"),
2577                    occurred_at_ms: unix_time_ms(trigger.recorded_at),
2578                }),
2579                AgentTrigger::DelegationResult { correlation_id, .. } => {
2580                    Some(TimelineItem::System {
2581                        label: "delegation result".to_string(),
2582                        detail: correlation_id.0.clone(),
2583                        occurred_at_ms: unix_time_ms(trigger.recorded_at),
2584                    })
2585                }
2586                AgentTrigger::RuleTrigger { rule_id, .. } => Some(TimelineItem::System {
2587                    label: "rule trigger".to_string(),
2588                    detail: rule_id.clone(),
2589                    occurred_at_ms: unix_time_ms(trigger.recorded_at),
2590                }),
2591                AgentTrigger::ProactiveHeartbeat { .. } => Some(TimelineItem::System {
2592                    label: "heartbeat".to_string(),
2593                    detail: "runtime wake".to_string(),
2594                    occurred_at_ms: unix_time_ms(trigger.recorded_at),
2595                }),
2596                AgentTrigger::Message {
2597                    user_id, content, ..
2598                } => Some(TimelineItem::HumanInput {
2599                    actor_id: user_id.clone(),
2600                    content: content.clone(),
2601                    occurred_at_ms: unix_time_ms(trigger.recorded_at),
2602                }),
2603            },
2604            SessionRecord::TriggerIntent(intent) => Some(TimelineItem::System {
2605                label: "intent classified".to_string(),
2606                detail: intent.intent.clone(),
2607                occurred_at_ms: unix_time_ms(intent.classified_at),
2608            }),
2609            SessionRecord::KernelEvent(event) => match &event.event {
2610                rain_engine_core::KernelEvent::WakeRequested(wake)
2611                | rain_engine_core::KernelEvent::WakeScheduled(wake) => {
2612                    Some(TimelineItem::System {
2613                        label: "wake scheduled".to_string(),
2614                        detail: format!("{} · {}", wake.wake_id.0, wake.reason),
2615                        occurred_at_ms: unix_time_ms(event.occurred_at),
2616                    })
2617                }
2618                rain_engine_core::KernelEvent::WakeCompleted {
2619                    wake_id, reason, ..
2620                } => Some(TimelineItem::System {
2621                    label: "wake completed".to_string(),
2622                    detail: format!("{}: {}", wake_id.0, reason),
2623                    occurred_at_ms: unix_time_ms(event.occurred_at),
2624                }),
2625                _ => None,
2626            },
2627            SessionRecord::Outcome(outcome) => outcome
2628                .response
2629                .clone()
2630                .or_else(|| outcome.detail.clone())
2631                .map(|content| TimelineItem::AssistantResponse {
2632                    content,
2633                    stop_reason: outcome.stop_reason.clone(),
2634                    occurred_at_ms: unix_time_ms(outcome.finished_at),
2635                }),
2636            SessionRecord::ToolCall(call) => Some(TimelineItem::ToolCall {
2637                call_id: call.call_id.clone(),
2638                skill_name: call.skill_name.clone(),
2639                formatted_call: format_tool_call(&call.skill_name, &call.args),
2640                occurred_at_ms: unix_time_ms(call.called_at),
2641            }),
2642            SessionRecord::ToolResult(result) => {
2643                let (success, preview) = tool_result_preview(result);
2644                Some(TimelineItem::ToolResult {
2645                    call_id: result.call_id.clone(),
2646                    skill_name: result.skill_name.clone(),
2647                    success,
2648                    preview,
2649                    occurred_at_ms: unix_time_ms(result.finished_at),
2650                })
2651            }
2652            SessionRecord::PendingApproval(approval) => Some(TimelineItem::ApprovalRequested {
2653                resume_token: approval.resume_token.0.clone(),
2654                pending_calls: approval.pending_calls.clone(),
2655                occurred_at_ms: unix_time_ms(approval.created_at),
2656            }),
2657            SessionRecord::ApprovalResolution(resolution) => Some(TimelineItem::ApprovalResolved {
2658                resume_token: resolution.resume_token.0.clone(),
2659                decision: resolution.decision.clone(),
2660                occurred_at_ms: unix_time_ms(resolution.resolved_at),
2661            }),
2662            SessionRecord::Deliberation(deliberation) => Some(TimelineItem::Plan {
2663                summary: deliberation.summary.clone(),
2664                candidate_actions: deliberation.candidate_actions.clone(),
2665                confidence: deliberation.confidence,
2666                outcome: deliberation.outcome.clone(),
2667                occurred_at_ms: unix_time_ms(deliberation.created_at),
2668            }),
2669            SessionRecord::ToolNodeCheckpoint(checkpoint) => Some(TimelineItem::ToolCheckpoint {
2670                call_id: checkpoint.call_id.clone(),
2671                skill_name: checkpoint.skill_name.clone(),
2672                status: checkpoint.status.clone(),
2673                attempt: checkpoint.attempt,
2674                detail: checkpoint.detail.clone(),
2675                occurred_at_ms: unix_time_ms(checkpoint.occurred_at),
2676            }),
2677            SessionRecord::SkillInputValidation(validation) if !validation.valid => {
2678                Some(TimelineItem::ValidationFailure {
2679                    call_id: validation.call_id.clone(),
2680                    skill_name: validation.skill_name.clone(),
2681                    errors: validation.errors.clone(),
2682                    occurred_at_ms: unix_time_ms(validation.validated_at),
2683                })
2684            }
2685            SessionRecord::Reflection(reflection) => Some(TimelineItem::Learning {
2686                label: "reflection".to_string(),
2687                detail: reflection.summary.clone(),
2688                confidence: reflection.confidence,
2689                occurred_at_ms: unix_time_ms(reflection.created_at),
2690            }),
2691            SessionRecord::PolicyTuning(tuning) => Some(TimelineItem::Learning {
2692                label: format!("policy {:?}", tuning.action).to_ascii_lowercase(),
2693                detail: tuning.overlay.reason.clone(),
2694                confidence: tuning.overlay.confidence,
2695                occurred_at_ms: unix_time_ms(tuning.created_at),
2696            }),
2697            SessionRecord::StrategyPreference(preference) => Some(TimelineItem::Learning {
2698                label: "strategy preference".to_string(),
2699                detail: preference.reason.clone(),
2700                confidence: preference.confidence,
2701                occurred_at_ms: unix_time_ms(preference.created_at),
2702            }),
2703            _ => None,
2704        })
2705        .collect()
2706}
2707
2708fn build_tool_timeline(records: &[SessionRecord]) -> Vec<ToolTimelineItem> {
2709    let results = records
2710        .iter()
2711        .filter_map(|record| match record {
2712            SessionRecord::ToolResult(result) => Some((result.call_id.clone(), result)),
2713            _ => None,
2714        })
2715        .collect::<BTreeMap<_, _>>();
2716
2717    records
2718        .iter()
2719        .filter_map(|record| match record {
2720            SessionRecord::ToolCall(call) => Some(tool_timeline_item(
2721                call,
2722                results.get(&call.call_id).copied(),
2723            )),
2724            _ => None,
2725        })
2726        .collect()
2727}
2728
2729fn tool_timeline_item(
2730    call: &ToolCallRecord,
2731    result: Option<&ToolResultRecord>,
2732) -> ToolTimelineItem {
2733    let (success, output_preview, failure_kind, finished_at_ms) = match result {
2734        Some(result) => {
2735            let (success, preview) = tool_result_preview(result);
2736            let failure_kind = match &result.output {
2737                Ok(_) => None,
2738                Err(error) => Some(format!("{:?}", error.kind)),
2739            };
2740            (
2741                Some(success),
2742                Some(preview),
2743                failure_kind,
2744                Some(unix_time_ms(result.finished_at)),
2745            )
2746        }
2747        None => (None, None, None, None),
2748    };
2749
2750    ToolTimelineItem {
2751        call_id: call.call_id.clone(),
2752        skill_name: call.skill_name.clone(),
2753        step: call.step,
2754        called_at_ms: unix_time_ms(call.called_at),
2755        finished_at_ms,
2756        backend_kind: format!("{:?}", call.backend_kind),
2757        args: call.args.clone(),
2758        success,
2759        output_preview,
2760        failure_kind,
2761    }
2762}
2763
2764fn tool_result_preview(result: &ToolResultRecord) -> (bool, String) {
2765    match &result.output {
2766        Ok(value) => (true, preview_value(value)),
2767        Err(error) => (false, error.message.clone()),
2768    }
2769}
2770
2771fn preview_value(value: &Value) -> String {
2772    if let Some(stdout) = value.get("stdout").and_then(Value::as_str) {
2773        return truncate_preview(stdout);
2774    }
2775    if let Some(content) = value.get("content").and_then(Value::as_str) {
2776        return truncate_preview(content);
2777    }
2778    if let Some(text) = value.as_str() {
2779        return truncate_preview(text);
2780    }
2781    truncate_preview(&serde_json::to_string(value).unwrap_or_else(|_| "<unprintable>".to_string()))
2782}
2783
2784fn truncate_preview(value: &str) -> String {
2785    const MAX_PREVIEW_CHARS: usize = 240;
2786    let mut preview = value
2787        .trim()
2788        .chars()
2789        .take(MAX_PREVIEW_CHARS)
2790        .collect::<String>();
2791    if value.trim().chars().count() > MAX_PREVIEW_CHARS {
2792        preview.push('…');
2793    }
2794    preview
2795}
2796
2797fn format_tool_call(name: &str, args: &Value) -> String {
2798    match args {
2799        Value::Object(map) if map.is_empty() => format!("{name}()"),
2800        Value::Object(map) => {
2801            let rendered = map
2802                .iter()
2803                .map(|(key, value)| format!("{key}: {}", preview_value(value)))
2804                .collect::<Vec<_>>()
2805                .join(", ");
2806            format!("{name}({rendered})")
2807        }
2808        other => format!("{name}({})", preview_value(other)),
2809    }
2810}
2811
2812// ---------------------------------------------------------------------------
2813// SSE streaming
2814// ---------------------------------------------------------------------------
2815
2816async fn handle_sse_stream(
2817    State(state): State<RuntimeState>,
2818    Path(session_id): Path<String>,
2819) -> Sse<impl futures_util::Stream<Item = Result<Event, std::convert::Infallible>>> {
2820    let memory = state.memory.clone();
2821    let mut last_seq: Option<i64> = None;
2822
2823    let stream = async_stream::stream! {
2824        loop {
2825            let snapshot = match memory.load_session(&session_id).await {
2826                Ok(snap) => snap,
2827                Err(_) => {
2828                    tokio::time::sleep(Duration::from_secs(2)).await;
2829                    continue;
2830                }
2831            };
2832
2833            let current_seq = snapshot.last_sequence_no;
2834            if current_seq != last_seq {
2835                last_seq = current_seq;
2836                let session_view = build_session_view(snapshot.clone());
2837                if let Ok(json) = serde_json::to_string(&session_view) {
2838                    yield Ok(Event::default().event("session_view").data(json));
2839                }
2840                if let Ok(json) = serde_json::to_string(&session_view.execution_graph) {
2841                    yield Ok(Event::default().event("execution_graph").data(json));
2842                }
2843                if let Ok(json) = serde_json::to_string(&session_view.self_improvement) {
2844                    yield Ok(Event::default().event("learning").data(json));
2845                }
2846                if let Ok(json) = serde_json::to_string(&session_view.pending_approval) {
2847                    yield Ok(Event::default().event("approval").data(json));
2848                }
2849                if let Ok(json) = serde_json::to_string(&session_view.wake_history) {
2850                    yield Ok(Event::default().event("wake").data(json));
2851                }
2852                if let Ok(json) = serde_json::to_string(&snapshot.records) {
2853                    yield Ok(Event::default().event("records").data(json));
2854                }
2855            }
2856
2857            tokio::time::sleep(Duration::from_millis(500)).await;
2858        }
2859    };
2860
2861    Sse::new(stream).keep_alive(KeepAlive::default())
2862}
2863
2864#[cfg(test)]
2865mod tests {
2866    use super::*;
2867    use async_trait::async_trait;
2868    use axum::body::Body;
2869    use axum::http::Request;
2870    use rain_engine_blob::LocalFileBlobStore;
2871    use rain_engine_core::{
2872        AgentAction, NativeSkill, PlannedSkillCall, SessionRecord, SkillExecutionError,
2873        SkillInvocation, SkillManifest, StopReason,
2874    };
2875    use serde_json::json;
2876    use tower::ServiceExt;
2877
2878    #[derive(Clone)]
2879    struct ApprovalNativeSkill;
2880
2881    #[async_trait]
2882    impl NativeSkill for ApprovalNativeSkill {
2883        async fn execute(
2884            &self,
2885            invocation: SkillInvocation,
2886        ) -> Result<serde_json::Value, SkillExecutionError> {
2887            Ok(json!({"approved": invocation.args}))
2888        }
2889
2890        fn requires_human_approval(&self) -> bool {
2891            true
2892        }
2893    }
2894
2895    fn server_config() -> RuntimeServerConfig {
2896        RuntimeServerConfig {
2897            bind_address: "127.0.0.1:0".parse().expect("addr"),
2898            request_timeout_ms: 1_000,
2899            default_policy: EnginePolicy::default(),
2900            allow_policy_overrides: true,
2901            allow_provider_overrides: true,
2902            default_provider: ProviderRequestConfig::default(),
2903            async_ingress: false,
2904        }
2905    }
2906
2907    fn runtime_state_with_mock(response: &str) -> RuntimeState {
2908        let memory: Arc<dyn MemoryStore> = Arc::new(InMemoryMemoryStore::new());
2909        let blob_store: Arc<dyn BlobStore> =
2910            Arc::from(build_blob_store(&BlobBootstrapConfig::InMemory).expect("blob store"));
2911        let llm: Arc<dyn LlmProvider> =
2912            Arc::new(MockLlmProvider::scripted(vec![AgentAction::Respond {
2913                content: response.to_string(),
2914            }]));
2915        let config = server_config();
2916        let settings = RuntimeMutableSettings::defaults(
2917            config.default_policy.clone(),
2918            config.default_provider.clone(),
2919        );
2920        RuntimeState::new(
2921            AgentEngine::new(llm, memory.clone()),
2922            memory,
2923            blob_store,
2924            config,
2925            settings,
2926        )
2927    }
2928
2929    #[tokio::test]
2930    async fn webhook_route_converts_json_request_into_agent_trigger() {
2931        let state = runtime_state_with_mock("processed");
2932
2933        let response = app(state)
2934            .oneshot(
2935                Request::post("/triggers/webhook/github")
2936                    .header("content-type", "application/json")
2937                    .body(Body::from(
2938                        serde_json::to_vec(&WebhookIngressRequest {
2939                            session_id: "runtime-session".to_string(),
2940                            payload: serde_json::json!({"action": "opened"}),
2941                            attachments: Vec::new(),
2942                            granted_scopes: BTreeSet::new(),
2943                            idempotency_key: Some("abc".to_string()),
2944                            provider: None,
2945                            policy_override: None,
2946                        })
2947                        .expect("request json"),
2948                    ))
2949                    .expect("request"),
2950            )
2951            .await
2952            .expect("response");
2953
2954        assert_eq!(response.status(), StatusCode::OK);
2955        let bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
2956            .await
2957            .expect("body");
2958        let run: RuntimeRunResult = serde_json::from_slice(&bytes).expect("run result");
2959        assert_eq!(run.outcome.stop_reason, StopReason::Responded);
2960        assert_eq!(run.outcome.response.as_deref(), Some("processed"));
2961        assert!(!run.advances.is_empty());
2962    }
2963
2964    #[tokio::test]
2965    async fn capabilities_route_exposes_runtime_surface() {
2966        let state = runtime_state_with_mock("processed");
2967
2968        let response = app(state)
2969            .oneshot(
2970                Request::get("/capabilities")
2971                    .body(Body::empty())
2972                    .expect("request"),
2973            )
2974            .await
2975            .expect("response");
2976
2977        assert_eq!(response.status(), StatusCode::OK);
2978        let bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
2979            .await
2980            .expect("body");
2981        let capabilities: RuntimeCapabilities =
2982            serde_json::from_slice(&bytes).expect("capabilities");
2983        assert!(capabilities.streaming);
2984        assert!(capabilities.approvals);
2985        assert!(capabilities.wake_support);
2986        assert!(capabilities.learning_support);
2987        assert!(
2988            capabilities
2989                .default_scopes
2990                .contains(&"tool:run".to_string())
2991        );
2992    }
2993
2994    #[tokio::test]
2995    async fn session_view_projects_records_for_control_room() {
2996        let state = runtime_state_with_mock("processed");
2997        let router = app(state.clone());
2998
2999        let response = router
3000            .oneshot(
3001                Request::post("/triggers/webhook/github")
3002                    .header("content-type", "application/json")
3003                    .body(Body::from(
3004                        serde_json::to_vec(&WebhookIngressRequest {
3005                            session_id: "view-session".to_string(),
3006                            payload: serde_json::json!({"action": "opened"}),
3007                            attachments: Vec::new(),
3008                            granted_scopes: BTreeSet::new(),
3009                            idempotency_key: None,
3010                            provider: None,
3011                            policy_override: None,
3012                        })
3013                        .expect("request json"),
3014                    ))
3015                    .expect("request"),
3016            )
3017            .await
3018            .expect("response");
3019        assert_eq!(response.status(), StatusCode::OK);
3020
3021        let view_response = app(state)
3022            .oneshot(
3023                Request::get("/sessions/view-session/view")
3024                    .body(Body::empty())
3025                    .expect("request"),
3026            )
3027            .await
3028            .expect("response");
3029
3030        assert_eq!(view_response.status(), StatusCode::OK);
3031        let bytes = axum::body::to_bytes(view_response.into_body(), usize::MAX)
3032            .await
3033            .expect("body");
3034        let view: SessionView = serde_json::from_slice(&bytes).expect("session view");
3035        assert_eq!(view.status, SessionStatus::Completed);
3036        assert_eq!(view.activity_state, SessionActivityState::Idle);
3037        assert!(view.record_count >= 3);
3038        assert!(view.current_focus.is_some());
3039        assert!(
3040            view.timeline
3041                .iter()
3042                .any(|item| matches!(item, TimelineItem::AssistantResponse { .. }))
3043        );
3044        assert!(!view.self_improvement.reflections.is_empty());
3045    }
3046
3047    #[tokio::test]
3048    async fn session_list_views_include_presence_metadata() {
3049        let state = runtime_state_with_mock("processed");
3050        let router = app(state.clone());
3051
3052        let response = router
3053            .oneshot(
3054                Request::post("/triggers/human/telegram:42")
3055                    .header("content-type", "application/json")
3056                    .body(Body::from(
3057                        serde_json::to_vec(&HumanInputIngressRequest {
3058                            session_id: "presence-session".to_string(),
3059                            content: "check status".to_string(),
3060                            attachments: Vec::new(),
3061                            granted_scopes: BTreeSet::new(),
3062                            idempotency_key: None,
3063                            provider: None,
3064                            policy_override: None,
3065                        })
3066                        .expect("request json"),
3067                    ))
3068                    .expect("request"),
3069            )
3070            .await
3071            .expect("response");
3072        assert_eq!(response.status(), StatusCode::OK);
3073
3074        let list_response = app(state)
3075            .oneshot(
3076                Request::get("/sessions/views")
3077                    .body(Body::empty())
3078                    .expect("request"),
3079            )
3080            .await
3081            .expect("response");
3082
3083        assert_eq!(list_response.status(), StatusCode::OK);
3084        let bytes = axum::body::to_bytes(list_response.into_body(), usize::MAX)
3085            .await
3086            .expect("body");
3087        let views: Vec<SessionListItemView> =
3088            serde_json::from_slice(&bytes).expect("session list views");
3089        let view = views
3090            .iter()
3091            .find(|session| session.session_id == "presence-session")
3092            .expect("presence session view");
3093        assert_eq!(view.status, SessionStatus::Completed);
3094        assert_eq!(view.activity_state, SessionActivityState::Idle);
3095        assert!(view.current_focus.is_some());
3096        assert_eq!(view.active_channel_ids, vec!["telegram".to_string()]);
3097        assert!(view.record_count >= 3);
3098    }
3099
3100    #[tokio::test]
3101    async fn multipart_webhook_persists_blob_attachment_references() {
3102        let memory: Arc<dyn MemoryStore> = Arc::new(InMemoryMemoryStore::new());
3103        let blob_store: Arc<dyn BlobStore> =
3104            Arc::from(build_blob_store(&BlobBootstrapConfig::InMemory).expect("blob store"));
3105        let llm: Arc<dyn LlmProvider> =
3106            Arc::new(MockLlmProvider::scripted(vec![AgentAction::Respond {
3107                content: "processed".to_string(),
3108            }]));
3109        let mut config = server_config();
3110        config.default_policy.max_inline_attachment_bytes = 4;
3111        let settings = RuntimeMutableSettings::defaults(
3112            config.default_policy.clone(),
3113            config.default_provider.clone(),
3114        );
3115        let state = RuntimeState::new(
3116            AgentEngine::new(llm, memory.clone()),
3117            memory.clone(),
3118            blob_store,
3119            config,
3120            settings,
3121        );
3122
3123        let boundary = "rain-engine-boundary";
3124        let body = format!(
3125            "--{boundary}\r\nContent-Disposition: form-data; name=\"session_id\"\r\n\r\nmulti-session\r\n\
3126--{boundary}\r\nContent-Disposition: form-data; name=\"payload\"\r\n\r\n{{\"event\":\"upload\"}}\r\n\
3127--{boundary}\r\nContent-Disposition: form-data; name=\"attachment\"; filename=\"schema.png\"\r\nContent-Type: image/png\r\n\r\n123456789\r\n\
3128--{boundary}--\r\n"
3129        );
3130
3131        let response = app(state.clone())
3132            .oneshot(
3133                Request::post("/triggers/webhook/files")
3134                    .header(
3135                        "content-type",
3136                        format!("multipart/form-data; boundary={boundary}"),
3137                    )
3138                    .body(Body::from(body.into_bytes()))
3139                    .expect("request"),
3140            )
3141            .await
3142            .expect("response");
3143
3144        assert_eq!(response.status(), StatusCode::OK);
3145        let snapshot = state
3146            .memory()
3147            .load_session("multi-session")
3148            .await
3149            .expect("session");
3150        let trigger = snapshot
3151            .records
3152            .into_iter()
3153            .find_map(|record| match record {
3154                SessionRecord::Trigger(record) => Some(record.trigger),
3155                _ => None,
3156            })
3157            .expect("trigger");
3158        match trigger {
3159            AgentTrigger::Webhook { attachments, .. } => {
3160                assert_eq!(attachments.len(), 1);
3161                assert!(matches!(
3162                    attachments[0].content,
3163                    rain_engine_core::AttachmentContent::Blob { .. }
3164                ));
3165            }
3166            other => panic!("unexpected trigger: {other:?}"),
3167        }
3168    }
3169
3170    #[tokio::test]
3171    async fn approval_route_resumes_suspended_native_skill() {
3172        let memory: Arc<dyn MemoryStore> = Arc::new(InMemoryMemoryStore::new());
3173        let blob_store: Arc<dyn BlobStore> =
3174            Arc::from(build_blob_store(&BlobBootstrapConfig::InMemory).expect("blob store"));
3175        let llm: Arc<dyn LlmProvider> = Arc::new(MockLlmProvider::scripted(vec![
3176            AgentAction::CallSkills(vec![PlannedSkillCall {
3177                call_id: "native-call".to_string(),
3178                name: "dangerous_native".to_string(),
3179                args: json!({"apply": true}),
3180                priority: 0,
3181                depends_on: Vec::new(),
3182                retry_policy: Default::default(),
3183                dry_run: false,
3184            }]),
3185            AgentAction::Respond {
3186                content: "completed".to_string(),
3187            },
3188        ]));
3189        let config = server_config();
3190        let settings = RuntimeMutableSettings::defaults(
3191            config.default_policy.clone(),
3192            config.default_provider.clone(),
3193        );
3194        let state = RuntimeState::new(
3195            AgentEngine::new(llm, memory.clone()),
3196            memory,
3197            blob_store,
3198            config,
3199            settings,
3200        );
3201        state.engine().register_native_skill(
3202            SkillManifest {
3203                name: "dangerous_native".to_string(),
3204                description: "Requires approval".to_string(),
3205                input_schema: json!({"type":"object"}),
3206                required_scopes: vec!["tool:run".to_string()],
3207                capability_grants: vec![],
3208                resource_policy: rain_engine_core::ResourcePolicy::default_for_tools(),
3209                approval_required: true,
3210                circuit_breaker_threshold: 0.5,
3211            },
3212            Arc::new(ApprovalNativeSkill),
3213        );
3214
3215        let start = app(state.clone())
3216            .oneshot(
3217                Request::post("/triggers/webhook/github")
3218                    .header("content-type", "application/json")
3219                    .body(Body::from(
3220                        serde_json::to_vec(&WebhookIngressRequest {
3221                            session_id: "approval-session".to_string(),
3222                            payload: json!({"action": "deploy"}),
3223                            attachments: Vec::new(),
3224                            granted_scopes: BTreeSet::from(["tool:run".to_string()]),
3225                            idempotency_key: None,
3226                            provider: None,
3227                            policy_override: None,
3228                        })
3229                        .expect("request json"),
3230                    ))
3231                    .expect("request"),
3232            )
3233            .await
3234            .expect("response");
3235
3236        let start_bytes = axum::body::to_bytes(start.into_body(), usize::MAX)
3237            .await
3238            .expect("body");
3239        let suspended: RuntimeRunResult = serde_json::from_slice(&start_bytes).expect("outcome");
3240        assert_eq!(suspended.outcome.stop_reason, StopReason::Suspended);
3241        let resume_token = suspended.outcome.resume_token.expect("resume token").0;
3242
3243        let resume = app(state)
3244            .oneshot(
3245                Request::post("/triggers/approval")
3246                    .header("content-type", "application/json")
3247                    .body(Body::from(
3248                        serde_json::to_vec(&ApprovalIngressRequest {
3249                            session_id: "approval-session".to_string(),
3250                            resume_token,
3251                            decision: ApprovalDecision::Approved,
3252                            metadata: json!({"approved_by": "tester"}),
3253                            granted_scopes: BTreeSet::from(["tool:run".to_string()]),
3254                            provider: None,
3255                            policy_override: None,
3256                        })
3257                        .expect("request json"),
3258                    ))
3259                    .expect("request"),
3260            )
3261            .await
3262            .expect("response");
3263
3264        let resume_bytes = axum::body::to_bytes(resume.into_body(), usize::MAX)
3265            .await
3266            .expect("body");
3267        let resumed: RuntimeRunResult = serde_json::from_slice(&resume_bytes).expect("outcome");
3268        assert_eq!(resumed.outcome.stop_reason, StopReason::Responded);
3269        assert_eq!(resumed.outcome.response.as_deref(), Some("completed"));
3270    }
3271
3272    #[tokio::test]
3273    async fn invalid_runtime_config_fails_fast() {
3274        let result = build_runtime_state(RuntimeBootstrapConfig {
3275            server: server_config(),
3276            store: StoreBootstrapConfig::Sqlite {
3277                database_url: "".to_string(),
3278            },
3279            cache: None,
3280            blob: BlobBootstrapConfig::InMemory,
3281            provider: ProviderBootstrapConfig::Mock {
3282                response: "processed".to_string(),
3283            },
3284            enable_research_planner: false,
3285        })
3286        .await;
3287
3288        match result {
3289            Ok(_) => panic!("expected config error"),
3290            Err(error) => assert!(error.to_string().contains("must not be empty")),
3291        }
3292    }
3293
3294    #[test]
3295    fn local_directory_blob_store_rejects_invalid_uri() {
3296        let error = LocalFileBlobStore::path_from_uri("memory://abc").expect_err("error");
3297        assert_eq!(error.message, "unsupported local blob uri");
3298    }
3299}