1use async_trait::async_trait;
7use axum::{
8 Json, Router,
9 body::to_bytes,
10 extract::{Path, Query, Request, State},
11 http::{StatusCode, header::CONTENT_TYPE},
12 response::{
13 IntoResponse,
14 sse::{Event, KeepAlive, Sse},
15 },
16 routing::{get, post},
17};
18use futures_util::stream;
19use rain_engine_blob::{BlobBackendConfig, build_blob_store};
20use rain_engine_core::{
21 AdvanceRequest, AgentEngine, AgentStateSnapshot, AgentTrigger, ApprovalDecision, AttachmentRef,
22 BlobStore, ContinueRequest, CorrelationId, EngineError, EngineOutcome, EnginePolicy,
23 InMemoryMemoryStore, LlmProvider, MemoryStore, MockLlmProvider, MultimodalPayload, NativeSkill,
24 PendingApprovalRecord, ProcessRequest, ProviderRequestConfig, RecordPageQuery, ResourcePolicy,
25 SessionListQuery, SessionRecord, SessionSnapshot, SkillCapability, SkillDefinition,
26 SkillExecutionError, SkillFailureKind, SkillInvocation, SkillManifest, SkillStore, StopReason,
27 ToolCallRecord, ToolResultRecord, WakeId, unix_time_ms,
28};
29use rain_engine_openai::{OpenAiCompatibleConfig, OpenAiCompatibleProvider};
30use rain_engine_provider_gemini::{GeminiAuth, GeminiConfig, GeminiProvider};
31use rain_engine_store_pg::PgMemoryStore;
32use rain_engine_store_sqlite::SqliteMemoryStore;
33use serde::{Deserialize, Serialize, de::DeserializeOwned};
34use serde_json::Value;
35use std::collections::{BTreeMap, BTreeSet, HashSet};
36use std::net::SocketAddr;
37use std::sync::Arc;
38use std::time::Duration;
39use thiserror::Error;
40use tracing::{error, info};
41use uuid::Uuid;
42
43const MAX_INGRESS_BODY_BYTES: usize = 64 * 1024 * 1024;
44
45#[typeshare::typeshare]
46#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
47pub struct WebhookIngressRequest {
48 pub session_id: String,
49 pub payload: Value,
50 #[serde(default)]
51 pub attachments: Vec<MultimodalPayload>,
52 #[serde(default)]
53 pub granted_scopes: BTreeSet<String>,
54 #[serde(default)]
55 pub idempotency_key: Option<String>,
56 #[serde(default)]
57 pub provider: Option<ProviderRequestConfig>,
58 #[serde(default)]
59 pub policy_override: Option<EnginePolicy>,
60}
61
62#[typeshare::typeshare]
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
64pub struct ApprovalIngressRequest {
65 pub session_id: String,
66 pub resume_token: String,
67 pub decision: ApprovalDecision,
68 #[serde(default)]
69 pub metadata: Value,
70 #[serde(default)]
71 pub granted_scopes: BTreeSet<String>,
72 #[serde(default)]
73 pub provider: Option<ProviderRequestConfig>,
74 #[serde(default)]
75 pub policy_override: Option<EnginePolicy>,
76}
77
78#[typeshare::typeshare]
79#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
80pub struct EventIngressRequest {
81 pub session_id: String,
82 pub payload: Value,
83 #[serde(default)]
84 pub attachments: Vec<MultimodalPayload>,
85 #[serde(default)]
86 pub granted_scopes: BTreeSet<String>,
87 #[serde(default)]
88 pub idempotency_key: Option<String>,
89 #[serde(default)]
90 pub provider: Option<ProviderRequestConfig>,
91 #[serde(default)]
92 pub policy_override: Option<EnginePolicy>,
93}
94
95#[typeshare::typeshare]
96#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
97pub struct HumanInputIngressRequest {
98 pub session_id: String,
99 pub content: String,
100 #[serde(default)]
101 pub attachments: Vec<MultimodalPayload>,
102 #[serde(default)]
103 pub granted_scopes: BTreeSet<String>,
104 #[serde(default)]
105 pub idempotency_key: Option<String>,
106 #[serde(default)]
107 pub provider: Option<ProviderRequestConfig>,
108 #[serde(default)]
109 pub policy_override: Option<EnginePolicy>,
110}
111
112#[typeshare::typeshare]
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
114pub struct ScheduledWakeIngressRequest {
115 pub session_id: String,
116 pub wake_id: String,
117 pub due_at: std::time::SystemTime,
118 pub reason: String,
119 #[serde(default)]
120 pub granted_scopes: BTreeSet<String>,
121 #[serde(default)]
122 pub provider: Option<ProviderRequestConfig>,
123 #[serde(default)]
124 pub policy_override: Option<EnginePolicy>,
125}
126
127#[typeshare::typeshare]
128#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
129pub struct DelegationResultIngressRequest {
130 pub session_id: String,
131 pub correlation_id: String,
132 pub payload: Value,
133 #[serde(default)]
134 pub metadata: Value,
135 #[serde(default)]
136 pub granted_scopes: BTreeSet<String>,
137 #[serde(default)]
138 pub provider: Option<ProviderRequestConfig>,
139 #[serde(default)]
140 pub policy_override: Option<EnginePolicy>,
141}
142
143#[typeshare::typeshare]
144#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
145pub struct InstallSkillRequest {
146 pub manifest: rain_engine_core::SkillManifest,
147 pub wasm_url: Option<String>,
148 pub wasm_base64: Option<String>,
149 pub file_path: Option<String>,
150}
151
152pub struct SkillInstallerSkill {
153 engine: AgentEngine,
154}
155
156impl SkillInstallerSkill {
157 pub fn new(engine: AgentEngine) -> Self {
158 Self { engine }
159 }
160}
161
162pub fn install_skill_manifest() -> SkillManifest {
163 SkillManifest {
164 name: "install_skill".to_string(),
165 description: "Installs a new WASM-based skill from a URL.".to_string(),
166 input_schema: serde_json::json!({
167 "type": "object",
168 "properties": {
169 "name": { "type": "string" },
170 "description": { "type": "string" },
171 "wasm_url": { "type": "string" },
172 "input_schema": { "type": "object" }
173 },
174 "required": ["name", "description", "wasm_url", "input_schema"]
175 }),
176 required_scopes: vec!["operator:skills".to_string()],
177 capability_grants: vec![],
178 resource_policy: ResourcePolicy::default_for_tools(),
179 approval_required: true,
180 circuit_breaker_threshold: 0.5,
181 }
182}
183
184#[async_trait]
185impl NativeSkill for SkillInstallerSkill {
186 async fn execute(&self, invocation: SkillInvocation) -> Result<Value, SkillExecutionError> {
187 let name = invocation
188 .args
189 .get("name")
190 .and_then(|v| v.as_str())
191 .ok_or_else(|| {
192 SkillExecutionError::new(SkillFailureKind::InvalidArguments, "missing name")
193 })?;
194 let description = invocation
195 .args
196 .get("description")
197 .and_then(|v| v.as_str())
198 .ok_or_else(|| {
199 SkillExecutionError::new(SkillFailureKind::InvalidArguments, "missing description")
200 })?;
201 let wasm_url = invocation.args.get("wasm_url").and_then(|v| v.as_str());
202 let wasm_base64 = invocation.args.get("wasm_base64").and_then(|v| v.as_str());
203 let file_path = invocation.args.get("file_path").and_then(|v| v.as_str());
204
205 if wasm_url.is_none() && wasm_base64.is_none() && file_path.is_none() {
206 return Err(SkillExecutionError::new(
207 SkillFailureKind::InvalidArguments,
208 "missing wasm_url, wasm_base64, or file_path",
209 ));
210 }
211
212 let input_schema = invocation
213 .args
214 .get("input_schema")
215 .cloned()
216 .ok_or_else(|| {
217 SkillExecutionError::new(SkillFailureKind::InvalidArguments, "missing input_schema")
218 })?;
219
220 let manifest = SkillManifest {
221 name: name.to_string(),
222 description: description.to_string(),
223 input_schema,
224 required_scopes: vec!["tool:run".to_string()],
225 capability_grants: vec![
226 SkillCapability::HttpOutbound {
227 allow_hosts: vec![],
228 },
229 SkillCapability::StructuredLog,
230 ],
231 resource_policy: ResourcePolicy::default_for_tools(),
232 approval_required: false,
233 circuit_breaker_threshold: 0.5,
234 };
235
236 let wasm_bytes = if let Some(url) = wasm_url {
238 let client = reqwest::Client::new();
239 client
240 .get(url)
241 .send()
242 .await
243 .map_err(|err| {
244 SkillExecutionError::new(
245 SkillFailureKind::Internal,
246 format!("Download failed: {}", err),
247 )
248 })?
249 .bytes()
250 .await
251 .map_err(|err| {
252 SkillExecutionError::new(
253 SkillFailureKind::Internal,
254 format!("Read failed: {}", err),
255 )
256 })?
257 .to_vec()
258 } else if let Some(b64) = wasm_base64 {
259 use base64::{Engine as _, engine::general_purpose};
260 general_purpose::STANDARD.decode(b64).map_err(|err| {
261 SkillExecutionError::new(
262 SkillFailureKind::InvalidArguments,
263 format!("Base64 decode failed: {}", err),
264 )
265 })?
266 } else if let Some(path) = file_path {
267 tokio::fs::read(path).await.map_err(|err| {
268 SkillExecutionError::new(
269 SkillFailureKind::InvalidArguments,
270 format!("File read failed: {}", err),
271 )
272 })?
273 } else {
274 return Err(SkillExecutionError::new(
275 SkillFailureKind::InvalidArguments,
276 "missing wasm_url, wasm_base64, or file_path",
277 ));
278 };
279
280 let config = rain_engine_wasm::WasmSkillConfig {
281 manifest: manifest.clone(),
282 wasm_bytes: Arc::new(wasm_bytes.to_vec()),
283 capabilities: Arc::new(
284 rain_engine_wasm::InMemoryCapabilityHost::new().with_http_client(),
285 ),
286 };
287
288 let executor = rain_engine_wasm::WasmSkillExecutor::new(config).map_err(|err| {
289 SkillExecutionError::new(SkillFailureKind::Internal, format!("Init failed: {}", err))
290 })?;
291
292 self.engine
293 .register_wasm_skill_persistent(manifest, Arc::new(executor), wasm_bytes.to_vec())
294 .await
295 .map_err(|err| {
296 SkillExecutionError::new(
297 SkillFailureKind::Internal,
298 format!("Storage failed: {err}"),
299 )
300 })?;
301
302 Ok(serde_json::json!({
303 "status": "success",
304 "message": format!("Skill {} installed successfully", name)
305 }))
306 }
307}
308
309#[typeshare::typeshare]
310#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
311pub struct RuntimeServerConfig {
312 pub bind_address: SocketAddr,
313 pub request_timeout_ms: u64,
314 pub default_policy: EnginePolicy,
315 pub allow_policy_overrides: bool,
316 pub allow_provider_overrides: bool,
317 pub default_provider: ProviderRequestConfig,
318 #[serde(default)]
319 pub async_ingress: bool,
320}
321
322#[typeshare::typeshare]
323#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
324#[serde(tag = "kind", rename_all = "snake_case")]
325pub enum StoreBootstrapConfig {
326 InMemory,
327 Sqlite { database_url: String },
328 Postgres { database_url: String },
329}
330
331#[typeshare::typeshare]
332#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
333pub struct CacheBootstrapConfig {
334 pub valkey_url: Option<String>,
335}
336
337pub type BlobBootstrapConfig = BlobBackendConfig;
338
339#[typeshare::typeshare]
340#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
341#[serde(rename_all = "snake_case")]
342pub enum GeminiAuthMode {
343 ApiKey,
344 BearerToken,
345}
346
347#[typeshare::typeshare]
348#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
349#[serde(tag = "kind", rename_all = "snake_case")]
350pub enum ProviderBootstrapConfig {
351 Mock {
352 response: String,
353 },
354 OpenAiCompatible {
355 base_url: String,
356 api_key: String,
357 model: String,
358 #[serde(default)]
359 temperature: Option<f32>,
360 #[serde(default)]
361 max_tokens: Option<u32>,
362 #[serde(default = "default_system_prompt")]
363 system_prompt: String,
364 },
365 Gemini {
366 base_url: String,
367 auth_mode: GeminiAuthMode,
368 credential: String,
369 model: String,
370 #[serde(default)]
371 temperature: Option<f32>,
372 #[serde(default)]
373 max_tokens: Option<u32>,
374 #[serde(default = "default_gemini_system_instruction")]
375 system_instruction: String,
376 #[serde(default = "default_gemini_provider_name")]
377 provider_name: String,
378 #[serde(default = "default_gemini_embedding_model")]
379 embedding_model: String,
380 },
381}
382
383fn default_system_prompt() -> String {
384 "You are a server-side automation agent. Prefer tool calls when available. When replying directly, return plain text or JSON with type=yield.".to_string()
385}
386
387fn default_gemini_system_instruction() -> String {
388 "You are a multimodal server-side automation agent. Use tools when they can complete the task precisely.".to_string()
389}
390
391fn default_gemini_provider_name() -> String {
392 "gemini".to_string()
393}
394
395fn default_gemini_embedding_model() -> String {
396 "text-embedding-004".to_string()
397}
398
399#[typeshare::typeshare]
400#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
401pub struct RuntimeBootstrapConfig {
402 pub server: RuntimeServerConfig,
403 pub store: StoreBootstrapConfig,
404 #[serde(default)]
405 pub cache: Option<CacheBootstrapConfig>,
406 pub blob: BlobBootstrapConfig,
407 pub provider: ProviderBootstrapConfig,
408 #[serde(default)]
409 pub enable_research_planner: bool,
410}
411
412#[derive(Clone)]
413pub struct RuntimeState {
414 engine: AgentEngine,
415 memory: Arc<dyn MemoryStore>,
416 blob_store: Arc<dyn BlobStore>,
417 config: RuntimeServerConfig,
418 provider_kind: String,
419 default_scopes: Vec<String>,
420 channel_statuses: Vec<RuntimeChannelView>,
421 settings: RuntimeMutableSettings,
422 ingress: Option<Arc<rain_engine_ingress::ValkeyStreamIngress>>,
423}
424
425#[typeshare::typeshare]
426#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
427pub struct RuntimeRunResult {
428 pub advances: Vec<rain_engine_core::AdvanceResult>,
429 pub outcome: EngineOutcome,
430}
431
432#[typeshare::typeshare]
433#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
434#[serde(rename_all = "snake_case")]
435pub enum SessionStatus {
436 Empty,
437 Running,
438 Completed,
439 Suspended,
440 Delegated,
441 Stopped,
442 Failed,
443}
444
445#[typeshare::typeshare]
446#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
447#[serde(rename_all = "snake_case")]
448pub enum SessionActivityState {
449 Idle,
450 Reasoning,
451 RunningTools,
452 WaitingHuman,
453 WaitingExternal,
454 Scheduled,
455 Delegated,
456 Errored,
457}
458
459#[typeshare::typeshare]
460#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
461pub struct WakeView {
462 pub wake_id: String,
463 pub reason: String,
464 pub status: String,
465 pub occurred_at_ms: i64,
466 pub due_at_ms: Option<i64>,
467 pub task_id: Option<String>,
468}
469
470#[typeshare::typeshare]
471#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
472pub struct HeartbeatStatusView {
473 pub wake_id: String,
474 pub reason: String,
475 pub occurred_at_ms: i64,
476 pub outcome_summary: Option<String>,
477 pub stop_reason: Option<StopReason>,
478}
479
480#[typeshare::typeshare]
481#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
482pub struct SessionListItemView {
483 pub session_id: String,
484 pub status: SessionStatus,
485 pub activity_state: SessionActivityState,
486 pub current_focus: Option<String>,
487 pub latest_provider: Option<String>,
488 pub last_activity_at_ms: i64,
489 pub pending_approval: bool,
490 pub pending_wake: bool,
491 pub unread_event_count: usize,
492 pub active_channel_ids: Vec<String>,
493 pub record_count: usize,
494}
495
496#[typeshare::typeshare]
497#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
498#[serde(rename_all = "snake_case")]
499pub enum RuntimeChannelStatus {
500 Connected,
501 Degraded,
502 Disabled,
503}
504
505#[typeshare::typeshare]
506#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
507pub struct RuntimeChannelView {
508 pub channel_id: String,
509 pub label: String,
510 pub transport: String,
511 pub status: RuntimeChannelStatus,
512 pub detail: Option<String>,
513}
514
515#[typeshare::typeshare]
516#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
517pub struct ApprovalMetadataSupport {
518 pub structured_json: bool,
519 pub recommended_fields: Vec<String>,
520}
521
522#[typeshare::typeshare]
523#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
524pub struct UploadSupport {
525 pub multipart: bool,
526 pub max_request_bytes: usize,
527}
528
529#[typeshare::typeshare]
530#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
531pub struct ApprovalView {
532 pub resume_token: String,
533 pub created_at_ms: i64,
534 pub trigger_id: String,
535 pub step: usize,
536 pub reason: String,
537 pub pending_calls: Vec<rain_engine_core::PlannedSkillCall>,
538}
539
540#[typeshare::typeshare]
541#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
542pub struct ToolTimelineItem {
543 pub call_id: String,
544 pub skill_name: String,
545 pub step: usize,
546 pub called_at_ms: i64,
547 pub finished_at_ms: Option<i64>,
548 pub backend_kind: String,
549 pub args: Value,
550 pub success: Option<bool>,
551 pub output_preview: Option<String>,
552 pub failure_kind: Option<String>,
553}
554
555#[typeshare::typeshare]
556#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
557pub struct SelfImprovementView {
558 pub active_overlay: Option<rain_engine_core::PolicyOverlay>,
559 pub reflections: Vec<rain_engine_core::ReflectionRecord>,
560 pub policy_tunings: Vec<rain_engine_core::PolicyTuningRecord>,
561 pub strategy_preferences: Vec<rain_engine_core::StrategyPreferenceRecord>,
562 pub tool_performance: Vec<rain_engine_core::ToolPerformanceRecord>,
563 pub profile_patches: Vec<rain_engine_core::ProfilePatchRecord>,
564}
565
566#[typeshare::typeshare]
567#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
568pub struct ExecutionGraphView {
569 pub active_graph: Option<rain_engine_core::ToolExecutionGraph>,
570 pub graphs: Vec<rain_engine_core::ToolExecutionGraph>,
571 pub checkpoints: Vec<rain_engine_core::ToolNodeCheckpointRecord>,
572 pub validations: Vec<rain_engine_core::SkillInputValidationRecord>,
573 pub blocked_call_ids: Vec<String>,
574}
575
576#[typeshare::typeshare]
577#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
578#[serde(tag = "type", content = "payload")]
579pub enum TimelineItem {
580 HumanInput {
581 actor_id: String,
582 content: String,
583 occurred_at_ms: i64,
584 },
585 AssistantResponse {
586 content: String,
587 stop_reason: StopReason,
588 occurred_at_ms: i64,
589 },
590 ToolCall {
591 call_id: String,
592 skill_name: String,
593 formatted_call: String,
594 occurred_at_ms: i64,
595 },
596 ToolResult {
597 call_id: String,
598 skill_name: String,
599 success: bool,
600 preview: String,
601 occurred_at_ms: i64,
602 },
603 ApprovalRequested {
604 resume_token: String,
605 pending_calls: Vec<rain_engine_core::PlannedSkillCall>,
606 occurred_at_ms: i64,
607 },
608 ApprovalResolved {
609 resume_token: String,
610 decision: ApprovalDecision,
611 occurred_at_ms: i64,
612 },
613 Plan {
614 summary: String,
615 candidate_actions: Vec<String>,
616 confidence: f64,
617 outcome: rain_engine_core::DeliberationOutcome,
618 occurred_at_ms: i64,
619 },
620 ToolCheckpoint {
621 call_id: String,
622 skill_name: String,
623 status: rain_engine_core::ToolNodeStatus,
624 attempt: usize,
625 detail: Option<String>,
626 occurred_at_ms: i64,
627 },
628 ValidationFailure {
629 call_id: String,
630 skill_name: String,
631 errors: Vec<String>,
632 occurred_at_ms: i64,
633 },
634 Learning {
635 label: String,
636 detail: String,
637 confidence: f64,
638 occurred_at_ms: i64,
639 },
640 System {
641 label: String,
642 detail: String,
643 occurred_at_ms: i64,
644 },
645}
646
647#[typeshare::typeshare]
648#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
649pub struct SessionView {
650 pub session_id: String,
651 pub status: SessionStatus,
652 pub activity_state: SessionActivityState,
653 pub current_focus: Option<String>,
654 pub current_task_id: Option<String>,
655 pub current_task_title: Option<String>,
656 pub next_wake_at_ms: Option<i64>,
657 pub blocked_reason: Option<String>,
658 pub last_human_input_at_ms: Option<i64>,
659 pub last_assistant_activity_at_ms: Option<i64>,
660 pub active_channel_ids: Vec<String>,
661 pub pending_wake: Option<WakeView>,
662 pub wake_history: Vec<WakeView>,
663 pub last_heartbeat: Option<HeartbeatStatusView>,
664 pub last_sequence_no: Option<i64>,
665 pub latest_outcome: Option<rain_engine_core::OutcomeRecord>,
666 pub pending_approval: Option<ApprovalView>,
667 pub state: AgentStateSnapshot,
668 pub timeline: Vec<TimelineItem>,
669 pub tool_timeline: Vec<ToolTimelineItem>,
670 pub self_improvement: SelfImprovementView,
671 pub execution_graph: ExecutionGraphView,
672 pub record_count: usize,
673 pub total_estimated_cost_usd: f64,
674}
675
676#[typeshare::typeshare]
677#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
678pub struct RuntimeCapabilities {
679 pub version: String,
680 pub provider_kind: String,
681 pub default_model: Option<String>,
682 pub streaming: bool,
683 pub approvals: bool,
684 pub multipart_uploads: bool,
685 pub default_scopes: Vec<String>,
686 pub default_policy: EnginePolicy,
687 pub channels: Vec<RuntimeChannelView>,
688 pub approval_metadata: ApprovalMetadataSupport,
689 pub wake_support: bool,
690 pub delegation_support: bool,
691 pub learning_support: bool,
692 pub upload_limits: UploadSupport,
693 pub skills: Vec<SkillDefinition>,
694}
695
696#[typeshare::typeshare]
697#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
698pub struct RuntimeSettingsView {
699 pub shell_exec: MutableSkillPolicyView,
700 pub http_fetch: MutableSkillPolicyView,
701 pub web_reader: MutableSkillPolicyView,
702 pub engine_policy: rain_engine_core::EnginePolicy,
703 pub provider_config: rain_engine_core::ProviderRequestConfig,
704}
705
706#[typeshare::typeshare]
707#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
708pub struct MutableSkillPolicyView {
709 pub permissive: bool,
710 pub allowlist: Vec<String>,
711 pub timeout_secs: u64,
712}
713
714#[typeshare::typeshare]
715#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
716pub struct RuntimeSettingsUpdateRequest {
717 #[serde(default)]
718 pub shell_exec: Option<MutableSkillPolicyUpdate>,
719 #[serde(default)]
720 pub http_fetch: Option<MutableSkillPolicyUpdate>,
721 #[serde(default)]
722 pub web_reader: Option<MutableSkillPolicyUpdate>,
723 #[serde(default)]
724 pub engine_policy: Option<rain_engine_core::EnginePolicy>,
725 #[serde(default)]
726 pub provider_config: Option<rain_engine_core::ProviderRequestConfig>,
727}
728
729#[typeshare::typeshare]
730#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
731pub struct MutableSkillPolicyUpdate {
732 #[serde(default)]
733 pub permissive: Option<bool>,
734 #[serde(default)]
735 pub allowlist: Option<Vec<String>>,
736}
737
738#[derive(Clone)]
739pub struct ManagedSkillPolicy {
740 pub access: rain_engine_skills::SharedAccessPolicy,
741 pub timeout: Duration,
742}
743
744impl ManagedSkillPolicy {
745 pub fn new(access: rain_engine_skills::SharedAccessPolicy, timeout: Duration) -> Self {
746 Self { access, timeout }
747 }
748
749 async fn to_view(&self) -> MutableSkillPolicyView {
750 let policy = self.access.read().await;
751 let mut allowlist = policy.allowlist.iter().cloned().collect::<Vec<_>>();
752 allowlist.sort();
753 MutableSkillPolicyView {
754 permissive: policy.permissive,
755 allowlist,
756 timeout_secs: self.timeout.as_secs(),
757 }
758 }
759
760 async fn apply(&self, update: MutableSkillPolicyUpdate) {
761 let mut policy = self.access.write().await;
762 if let Some(permissive) = update.permissive {
763 policy.permissive = permissive;
764 }
765 if let Some(allowlist) = update.allowlist {
766 policy.allowlist = normalize_allowlist(allowlist);
767 }
768 }
769}
770
771#[derive(Clone)]
772pub struct RuntimeMutableSettings {
773 pub shell_exec: ManagedSkillPolicy,
774 pub http_fetch: ManagedSkillPolicy,
775 pub web_reader: ManagedSkillPolicy,
776 pub engine_policy: std::sync::Arc<tokio::sync::RwLock<rain_engine_core::EnginePolicy>>,
777 pub provider_config:
778 std::sync::Arc<tokio::sync::RwLock<rain_engine_core::ProviderRequestConfig>>,
779}
780
781impl RuntimeMutableSettings {
782 pub fn defaults(
783 engine_policy: rain_engine_core::EnginePolicy,
784 provider_config: rain_engine_core::ProviderRequestConfig,
785 ) -> Self {
786 let timeout = Duration::from_secs(30);
787 Self {
788 shell_exec: ManagedSkillPolicy::new(
789 rain_engine_skills::shared_access_policy(HashSet::new(), false),
790 timeout,
791 ),
792 http_fetch: ManagedSkillPolicy::new(
793 rain_engine_skills::shared_access_policy(HashSet::new(), false),
794 timeout,
795 ),
796 web_reader: ManagedSkillPolicy::new(
797 rain_engine_skills::shared_access_policy(HashSet::new(), false),
798 timeout,
799 ),
800 engine_policy: std::sync::Arc::new(tokio::sync::RwLock::new(engine_policy)),
801 provider_config: std::sync::Arc::new(tokio::sync::RwLock::new(provider_config)),
802 }
803 }
804
805 async fn to_view(&self) -> RuntimeSettingsView {
806 let engine_policy = self.engine_policy.read().await.clone();
807 let provider_config = self.provider_config.read().await.clone();
808 RuntimeSettingsView {
809 shell_exec: self.shell_exec.to_view().await,
810 http_fetch: self.http_fetch.to_view().await,
811 web_reader: self.web_reader.to_view().await,
812 engine_policy,
813 provider_config,
814 }
815 }
816
817 async fn apply(&self, update: RuntimeSettingsUpdateRequest) {
818 if let Some(shell_exec) = update.shell_exec {
819 self.shell_exec.apply(shell_exec).await;
820 }
821 if let Some(http_fetch) = update.http_fetch {
822 self.http_fetch.apply(http_fetch).await;
823 }
824 if let Some(web_reader) = update.web_reader {
825 self.web_reader.apply(web_reader).await;
826 }
827 if let Some(engine_policy) = update.engine_policy {
828 *self.engine_policy.write().await = engine_policy;
829 }
830 if let Some(provider_config) = update.provider_config {
831 *self.provider_config.write().await = provider_config;
832 }
833 }
834}
835
836impl RuntimeState {
837 pub fn new(
838 engine: AgentEngine,
839 memory: Arc<dyn MemoryStore>,
840 blob_store: Arc<dyn BlobStore>,
841 config: RuntimeServerConfig,
842 settings: RuntimeMutableSettings,
843 ) -> Self {
844 Self {
845 engine,
846 memory,
847 blob_store,
848 config,
849 provider_kind: "custom".to_string(),
850 default_scopes: vec!["tool:run".to_string()],
851 channel_statuses: Vec::new(),
852 settings,
853 ingress: None,
854 }
855 }
856
857 pub fn with_provider_kind(mut self, provider_kind: impl Into<String>) -> Self {
858 self.provider_kind = provider_kind.into();
859 self
860 }
861
862 pub fn with_default_scopes(mut self, default_scopes: Vec<String>) -> Self {
863 self.default_scopes = default_scopes;
864 self
865 }
866
867 pub fn with_channel_statuses(mut self, channel_statuses: Vec<RuntimeChannelView>) -> Self {
868 self.channel_statuses = channel_statuses;
869 self
870 }
871
872 pub fn with_settings(mut self, settings: RuntimeMutableSettings) -> Self {
873 self.settings = settings;
874 self
875 }
876
877 pub fn with_ingress(mut self, ingress: rain_engine_ingress::ValkeyStreamIngress) -> Self {
878 self.ingress = Some(Arc::new(ingress));
879 self
880 }
881
882 pub fn engine(&self) -> &AgentEngine {
883 &self.engine
884 }
885
886 pub fn memory(&self) -> Arc<dyn MemoryStore> {
887 self.memory.clone()
888 }
889
890 pub fn blob_store(&self) -> Arc<dyn BlobStore> {
891 self.blob_store.clone()
892 }
893
894 pub fn settings(&self) -> RuntimeMutableSettings {
895 self.settings.clone()
896 }
897}
898
899fn normalize_allowlist(values: Vec<String>) -> HashSet<String> {
900 values
901 .into_iter()
902 .map(|value| value.trim().to_string())
903 .filter(|value| !value.is_empty())
904 .collect()
905}
906
907#[derive(Debug, Error)]
908pub enum RuntimeConfigError {
909 #[error("{0}")]
910 Invalid(String),
911}
912
913#[derive(Debug, Error)]
914enum IngressError {
915 #[error("unsupported content type")]
916 UnsupportedContentType,
917 #[error("malformed request: {0}")]
918 Malformed(String),
919 #[error("blob storage failed: {0}")]
920 Blob(String),
921}
922
923pub async fn build_runtime_state(
924 config: RuntimeBootstrapConfig,
925) -> Result<RuntimeState, RuntimeConfigError> {
926 let provider_kind = match &config.provider {
927 ProviderBootstrapConfig::Mock { .. } => "mock",
928 ProviderBootstrapConfig::OpenAiCompatible { .. } => "openai_compatible",
929 ProviderBootstrapConfig::Gemini { .. } => "gemini",
930 }
931 .to_string();
932
933 let (memory, skill_store): (Arc<dyn MemoryStore>, Option<Arc<dyn SkillStore>>) =
934 match &config.store {
935 StoreBootstrapConfig::InMemory => (
936 Arc::new(InMemoryMemoryStore::new()),
937 Some(Arc::new(rain_engine_core::InMemorySkillStore::new()) as Arc<dyn SkillStore>),
938 ),
939 StoreBootstrapConfig::Sqlite { database_url } => {
940 if database_url.trim().is_empty() {
941 return Err(RuntimeConfigError::Invalid(
942 "sqlite database_url must not be empty".to_string(),
943 ));
944 }
945 let store = Arc::new(
946 SqliteMemoryStore::connect(database_url)
947 .await
948 .map_err(|err| RuntimeConfigError::Invalid(err.message))?,
949 );
950 (
951 store.clone() as Arc<dyn MemoryStore>,
952 Some(store as Arc<dyn SkillStore>),
953 )
954 }
955 StoreBootstrapConfig::Postgres { database_url } => {
956 if database_url.trim().is_empty() {
957 return Err(RuntimeConfigError::Invalid(
958 "postgres database_url must not be empty".to_string(),
959 ));
960 }
961 (
962 Arc::new(
963 PgMemoryStore::connect_lazy(database_url)
964 .map_err(|err| RuntimeConfigError::Invalid(err.message))?,
965 ),
966 None,
967 )
968 }
969 };
970
971 let blob_store: Arc<dyn BlobStore> = Arc::from(
972 build_blob_store(&config.blob).map_err(|err| RuntimeConfigError::Invalid(err.message))?,
973 );
974
975 let llm: Arc<dyn LlmProvider> = match &config.provider {
976 ProviderBootstrapConfig::Mock { response } => {
977 let response_str = response.clone();
978 Arc::new(MockLlmProvider::dynamic(move |_| {
979 Ok(rain_engine_core::ProviderDecision {
980 action: rain_engine_core::AgentAction::Respond {
981 content: response_str.clone(),
982 },
983 usage: None,
984 cache: None,
985 })
986 }))
987 }
988 ProviderBootstrapConfig::OpenAiCompatible {
989 base_url,
990 api_key,
991 model,
992 temperature,
993 max_tokens,
994 system_prompt,
995 } => Arc::new(
996 OpenAiCompatibleProvider::new(OpenAiCompatibleConfig {
997 base_url: base_url.clone(),
998 api_key: api_key.clone(),
999 default_request: ProviderRequestConfig {
1000 model: Some(model.clone()),
1001 temperature: *temperature,
1002 max_tokens: *max_tokens,
1003 },
1004 system_prompt: system_prompt.clone(),
1005 })
1006 .map_err(|err| RuntimeConfigError::Invalid(err.to_string()))?,
1007 ),
1008 ProviderBootstrapConfig::Gemini {
1009 base_url,
1010 auth_mode,
1011 credential,
1012 model,
1013 temperature,
1014 max_tokens,
1015 system_instruction,
1016 provider_name,
1017 embedding_model,
1018 } => Arc::new(
1019 GeminiProvider::new(GeminiConfig {
1020 base_url: base_url.clone(),
1021 auth: match auth_mode {
1022 GeminiAuthMode::ApiKey => GeminiAuth::ApiKey(credential.clone()),
1023 GeminiAuthMode::BearerToken => GeminiAuth::BearerToken(credential.clone()),
1024 },
1025 default_request: ProviderRequestConfig {
1026 model: Some(model.clone()),
1027 temperature: *temperature,
1028 max_tokens: *max_tokens,
1029 },
1030 system_instruction: system_instruction.clone(),
1031 provider_name: provider_name.clone(),
1032 embedding_model: embedding_model.clone(),
1033 })
1034 .map_err(|err| RuntimeConfigError::Invalid(err.to_string()))?,
1035 ),
1036 };
1037
1038 let mut engine = AgentEngine::new(llm.clone(), memory.clone());
1039
1040 if let Some(cache_config) = &config.cache
1041 && let Some(valkey_url) = &cache_config.valkey_url
1042 {
1043 let valkey_cache = rain_engine_core::ValkeyStateCache::new(valkey_url, "rain")
1044 .map_err(|err| RuntimeConfigError::Invalid(format!("Valkey cache error: {}", err)))?;
1045 engine = engine.with_state_cache(Arc::new(valkey_cache));
1046 tracing::info!("Configured Valkey state projection cache");
1047 }
1048
1049 if let Some(store) = &skill_store {
1050 engine = engine.with_skill_store(store.clone());
1051
1052 if let Ok(persisted_skills) = store.list_skills().await {
1054 info!("Reloading {} persisted skills...", persisted_skills.len());
1055 for (manifest, wasm_bytes) in persisted_skills {
1056 let config = rain_engine_wasm::WasmSkillConfig {
1057 manifest: manifest.clone(),
1058 wasm_bytes: Arc::new(wasm_bytes),
1059 capabilities: Arc::new(
1060 rain_engine_wasm::InMemoryCapabilityHost::new().with_http_client(),
1061 ),
1062 };
1063 match rain_engine_wasm::WasmSkillExecutor::new(config) {
1064 Ok(executor) => {
1065 engine.register_wasm_skill(manifest, Arc::new(executor));
1066 }
1067 Err(err) => {
1068 error!("Failed to reload skill {}: {}", manifest.name, err);
1069 }
1070 }
1071 }
1072 }
1073 }
1074
1075 engine.register_native_skill(
1076 install_skill_manifest(),
1077 Arc::new(SkillInstallerSkill::new(engine.clone())),
1078 );
1079
1080 let settings = RuntimeMutableSettings::defaults(
1081 rain_engine_core::EnginePolicy::default(),
1082 rain_engine_core::ProviderRequestConfig::default(),
1083 );
1084 engine.register_native_skill(
1085 rain_engine_skills::web_reader::manifest(),
1086 Arc::new(
1087 rain_engine_skills::web_reader::WebReaderSkill::with_shared_policy(
1088 settings.web_reader.access.clone(),
1089 settings.web_reader.timeout,
1090 ),
1091 ),
1092 );
1093
1094 if config.enable_research_planner {
1095 engine = engine.with_planner(Arc::new(rain_engine_cognition::ResearchPlanner::new(llm)));
1096 }
1097
1098 Ok(
1099 RuntimeState::new(engine, memory, blob_store, config.server, settings)
1100 .with_provider_kind(provider_kind),
1101 )
1102}
1103
1104pub fn app(state: RuntimeState) -> Router {
1105 Router::new()
1106 .route("/triggers/webhook/{source}", post(handle_webhook))
1108 .route("/triggers/external/{source}", post(handle_external_event))
1109 .route("/triggers/human/{actor_id}", post(handle_human_input))
1110 .route("/triggers/system/{source}", post(handle_system_observation))
1111 .route("/triggers/wake", post(handle_scheduled_wake))
1112 .route("/triggers/approval", post(handle_approval))
1113 .route(
1114 "/triggers/delegation-result",
1115 post(handle_delegation_result),
1116 )
1117 .route("/health", get(handle_health))
1119 .route("/capabilities", get(handle_capabilities))
1120 .route(
1121 "/settings",
1122 get(handle_get_settings).put(handle_update_settings),
1123 )
1124 .route("/sessions", get(handle_list_sessions))
1125 .route("/sessions/views", get(handle_list_session_views))
1126 .route("/sessions/{session_id}", get(handle_get_session))
1127 .route("/sessions/{session_id}/view", get(handle_get_session_view))
1128 .route(
1129 "/sessions/{session_id}/execution-graph",
1130 get(handle_get_execution_graph),
1131 )
1132 .route("/sessions/{session_id}/records", get(handle_list_records))
1133 .route("/capabilities/skills", post(handle_install_skill))
1135 .route("/sessions/{session_id}/stream", get(handle_sse_stream))
1137 .with_state(state)
1138}
1139
1140pub async fn serve(addr: SocketAddr, state: RuntimeState) -> Result<(), std::io::Error> {
1141 let listener = tokio::net::TcpListener::bind(addr).await?;
1142 axum::serve(listener, app(state)).await
1143}
1144
1145async fn handle_external_event(
1146 State(state): State<RuntimeState>,
1147 Path(source): Path<String>,
1148 Json(request): Json<EventIngressRequest>,
1149) -> Result<axum::response::Response, (StatusCode, String)> {
1150 let policy = effective_policy(&state, request.policy_override).await;
1151 let provider = effective_provider(&state, request.provider).await;
1152 let attachments = materialize_attachments(
1153 &state,
1154 policy.max_inline_attachment_bytes,
1155 request.attachments,
1156 )
1157 .await
1158 .map_err(map_ingress_error)?;
1159 run_process_request(
1160 &state,
1161 ProcessRequest {
1162 session_id: request.session_id,
1163 trigger: AgentTrigger::ExternalEvent {
1164 source,
1165 payload: request.payload,
1166 attachments,
1167 },
1168 granted_scopes: request.granted_scopes,
1169 idempotency_key: request.idempotency_key,
1170 policy,
1171 provider,
1172 cancellation: tokio_util::sync::CancellationToken::new(),
1173 },
1174 )
1175 .await
1176}
1177
1178async fn handle_human_input(
1179 State(state): State<RuntimeState>,
1180 Path(actor_id): Path<String>,
1181 Json(request): Json<HumanInputIngressRequest>,
1182) -> Result<axum::response::Response, (StatusCode, String)> {
1183 let policy = effective_policy(&state, request.policy_override).await;
1184 let provider = effective_provider(&state, request.provider).await;
1185 let attachments = materialize_attachments(
1186 &state,
1187 policy.max_inline_attachment_bytes,
1188 request.attachments,
1189 )
1190 .await
1191 .map_err(map_ingress_error)?;
1192 run_process_request(
1193 &state,
1194 ProcessRequest {
1195 session_id: request.session_id,
1196 trigger: AgentTrigger::HumanInput {
1197 actor_id,
1198 content: request.content,
1199 attachments,
1200 },
1201 granted_scopes: request.granted_scopes,
1202 idempotency_key: request.idempotency_key,
1203 policy,
1204 provider,
1205 cancellation: tokio_util::sync::CancellationToken::new(),
1206 },
1207 )
1208 .await
1209}
1210
1211async fn handle_system_observation(
1212 State(state): State<RuntimeState>,
1213 Path(source): Path<String>,
1214 Json(request): Json<EventIngressRequest>,
1215) -> Result<axum::response::Response, (StatusCode, String)> {
1216 let policy = effective_policy(&state, request.policy_override).await;
1217 let provider = effective_provider(&state, request.provider).await;
1218 let attachments = materialize_attachments(
1219 &state,
1220 policy.max_inline_attachment_bytes,
1221 request.attachments,
1222 )
1223 .await
1224 .map_err(map_ingress_error)?;
1225 run_process_request(
1226 &state,
1227 ProcessRequest {
1228 session_id: request.session_id,
1229 trigger: AgentTrigger::SystemObservation {
1230 source,
1231 observation: request.payload,
1232 attachments,
1233 },
1234 granted_scopes: request.granted_scopes,
1235 idempotency_key: request.idempotency_key,
1236 policy,
1237 provider,
1238 cancellation: tokio_util::sync::CancellationToken::new(),
1239 },
1240 )
1241 .await
1242}
1243
1244async fn handle_scheduled_wake(
1245 State(state): State<RuntimeState>,
1246 Json(request): Json<ScheduledWakeIngressRequest>,
1247) -> Result<axum::response::Response, (StatusCode, String)> {
1248 let policy = effective_policy(&state, request.policy_override).await;
1249 let provider = effective_provider(&state, request.provider).await;
1250 run_process_request(
1251 &state,
1252 ProcessRequest {
1253 session_id: request.session_id,
1254 trigger: AgentTrigger::ScheduledWake {
1255 wake_id: WakeId(request.wake_id),
1256 due_at: request.due_at,
1257 reason: request.reason,
1258 },
1259 granted_scopes: request.granted_scopes,
1260 idempotency_key: None,
1261 policy,
1262 provider,
1263 cancellation: tokio_util::sync::CancellationToken::new(),
1264 },
1265 )
1266 .await
1267}
1268
1269pub fn init_tracing() {
1270 let _ = tracing_subscriber::fmt::try_init();
1271}
1272
1273async fn handle_delegation_result(
1274 State(state): State<RuntimeState>,
1275 Json(request): Json<DelegationResultIngressRequest>,
1276) -> Result<axum::response::Response, (StatusCode, String)> {
1277 let policy = effective_policy(&state, request.policy_override).await;
1278 let provider = effective_provider(&state, request.provider).await;
1279 run_process_request(
1280 &state,
1281 ProcessRequest {
1282 session_id: request.session_id,
1283 trigger: AgentTrigger::DelegationResult {
1284 correlation_id: CorrelationId(request.correlation_id),
1285 payload: request.payload,
1286 metadata: request.metadata,
1287 },
1288 granted_scopes: request.granted_scopes,
1289 idempotency_key: None,
1290 policy,
1291 provider,
1292 cancellation: tokio_util::sync::CancellationToken::new(),
1293 },
1294 )
1295 .await
1296}
1297
1298async fn handle_webhook(
1299 State(state): State<RuntimeState>,
1300 Path(source): Path<String>,
1301 request: Request,
1302) -> Result<axum::response::Response, (StatusCode, String)> {
1303 let envelope = parse_webhook_envelope(request)
1304 .await
1305 .map_err(map_ingress_error)?;
1306 let policy = effective_policy(&state, envelope.policy_override).await;
1307 let provider = effective_provider(&state, envelope.provider).await;
1308 let attachments = materialize_attachments(
1309 &state,
1310 policy.max_inline_attachment_bytes,
1311 envelope.attachments,
1312 )
1313 .await
1314 .map_err(map_ingress_error)?;
1315
1316 run_process_request(
1317 &state,
1318 ProcessRequest {
1319 session_id: envelope.session_id,
1320 trigger: AgentTrigger::Webhook {
1321 source,
1322 payload: envelope.payload,
1323 attachments,
1324 },
1325 granted_scopes: envelope.granted_scopes,
1326 idempotency_key: envelope.idempotency_key,
1327 policy,
1328 provider,
1329 cancellation: tokio_util::sync::CancellationToken::new(),
1330 },
1331 )
1332 .await
1333}
1334
1335async fn handle_approval(
1336 State(state): State<RuntimeState>,
1337 Json(request): Json<ApprovalIngressRequest>,
1338) -> Result<axum::response::Response, (StatusCode, String)> {
1339 let policy = effective_policy(&state, request.policy_override).await;
1340 let provider = effective_provider(&state, request.provider).await;
1341 run_process_request(
1342 &state,
1343 ProcessRequest {
1344 session_id: request.session_id,
1345 trigger: AgentTrigger::Approval {
1346 resume_token: rain_engine_core::ResumeToken(request.resume_token),
1347 decision: request.decision,
1348 metadata: request.metadata,
1349 },
1350 granted_scopes: request.granted_scopes,
1351 idempotency_key: None,
1352 policy,
1353 provider,
1354 cancellation: tokio_util::sync::CancellationToken::new(),
1355 },
1356 )
1357 .await
1358}
1359
1360async fn parse_webhook_envelope(request: Request) -> Result<WebhookIngressRequest, IngressError> {
1361 let content_type = request
1362 .headers()
1363 .get(CONTENT_TYPE)
1364 .and_then(|value| value.to_str().ok())
1365 .unwrap_or("application/json")
1366 .to_string();
1367 let body = to_bytes(request.into_body(), MAX_INGRESS_BODY_BYTES)
1368 .await
1369 .map_err(|err| IngressError::Malformed(err.to_string()))?;
1370
1371 if content_type.starts_with("application/json") {
1372 return serde_json::from_slice(&body)
1373 .map_err(|err| IngressError::Malformed(err.to_string()));
1374 }
1375
1376 if content_type.starts_with("multipart/form-data") {
1377 return parse_multipart_webhook(&content_type, body).await;
1378 }
1379
1380 Err(IngressError::UnsupportedContentType)
1381}
1382
1383async fn parse_multipart_webhook(
1384 content_type: &str,
1385 body: axum::body::Bytes,
1386) -> Result<WebhookIngressRequest, IngressError> {
1387 let boundary = multer::parse_boundary(content_type)
1388 .map_err(|err| IngressError::Malformed(err.to_string()))?;
1389 let stream = stream::once(async move { Ok::<_, std::convert::Infallible>(body) });
1390 let mut multipart = multer::Multipart::new(stream, boundary);
1391
1392 let mut session_id = None::<String>;
1393 let mut payload = None::<Value>;
1394 let mut attachments = Vec::new();
1395 let mut granted_scopes = BTreeSet::new();
1396 let mut idempotency_key = None::<String>;
1397 let mut provider = None::<ProviderRequestConfig>;
1398 let mut policy_override = None::<EnginePolicy>;
1399
1400 while let Some(field) = multipart
1401 .next_field()
1402 .await
1403 .map_err(|err| IngressError::Malformed(err.to_string()))?
1404 {
1405 let name = field.name().unwrap_or_default().to_string();
1406 let file_name = field.file_name().map(str::to_string);
1407 let mime_type = field
1408 .content_type()
1409 .map(|value| value.to_string())
1410 .unwrap_or_else(|| "application/octet-stream".to_string());
1411
1412 if file_name.is_some() {
1413 attachments.push(MultimodalPayload {
1414 mime_type,
1415 file_name,
1416 data: field
1417 .bytes()
1418 .await
1419 .map_err(|err| IngressError::Malformed(err.to_string()))?
1420 .to_vec(),
1421 });
1422 continue;
1423 }
1424
1425 let text = field
1426 .text()
1427 .await
1428 .map_err(|err| IngressError::Malformed(err.to_string()))?;
1429 match name.as_str() {
1430 "session_id" => session_id = Some(text),
1431 "payload" => {
1432 payload = Some(parse_json_value(&text, "payload")?);
1433 }
1434 "granted_scope" => {
1435 if !text.trim().is_empty() {
1436 granted_scopes.insert(text);
1437 }
1438 }
1439 "granted_scopes" => {
1440 let scopes = parse_json_value::<Vec<String>>(&text, "granted_scopes")?;
1441 granted_scopes.extend(scopes);
1442 }
1443 "idempotency_key" => {
1444 if !text.trim().is_empty() {
1445 idempotency_key = Some(text);
1446 }
1447 }
1448 "provider" => provider = Some(parse_json_value(&text, "provider")?),
1449 "policy_override" => {
1450 policy_override = Some(parse_json_value(&text, "policy_override")?)
1451 }
1452 _ => {}
1453 }
1454 }
1455
1456 Ok(WebhookIngressRequest {
1457 session_id: session_id
1458 .ok_or_else(|| IngressError::Malformed("missing session_id".to_string()))?,
1459 payload: payload.unwrap_or(Value::Null),
1460 attachments,
1461 granted_scopes,
1462 idempotency_key,
1463 provider,
1464 policy_override,
1465 })
1466}
1467
1468fn parse_json_value<T: DeserializeOwned>(text: &str, field_name: &str) -> Result<T, IngressError> {
1469 serde_json::from_str(text)
1470 .map_err(|err| IngressError::Malformed(format!("invalid {field_name}: {err}")))
1471}
1472
1473async fn materialize_attachments(
1474 state: &RuntimeState,
1475 max_inline_attachment_bytes: usize,
1476 payloads: Vec<MultimodalPayload>,
1477) -> Result<Vec<AttachmentRef>, IngressError> {
1478 let mut attachments = Vec::with_capacity(payloads.len());
1479 for payload in payloads {
1480 let attachment_id = Uuid::new_v4().to_string();
1481 if payload.data.len() <= max_inline_attachment_bytes {
1482 attachments.push(AttachmentRef::inline(
1483 attachment_id,
1484 payload.mime_type,
1485 payload.file_name,
1486 payload.data,
1487 ));
1488 } else {
1489 attachments.push(
1490 state
1491 .blob_store
1492 .put(attachment_id, payload)
1493 .await
1494 .map_err(|err| IngressError::Blob(err.message))?,
1495 );
1496 }
1497 }
1498 Ok(attachments)
1499}
1500
1501async fn effective_policy(
1502 state: &RuntimeState,
1503 override_policy: Option<EnginePolicy>,
1504) -> EnginePolicy {
1505 if state.config.allow_policy_overrides
1506 && let Some(p) = override_policy
1507 {
1508 return p;
1509 }
1510 state.settings.engine_policy.read().await.clone()
1511}
1512
1513async fn effective_provider(
1514 state: &RuntimeState,
1515 override_provider: Option<ProviderRequestConfig>,
1516) -> ProviderRequestConfig {
1517 if state.config.allow_provider_overrides
1518 && let Some(p) = override_provider
1519 {
1520 return p;
1521 }
1522 state.settings.provider_config.read().await.clone()
1523}
1524
1525async fn run_process_request(
1526 state: &RuntimeState,
1527 mut request: ProcessRequest,
1528) -> Result<axum::response::Response, (StatusCode, String)> {
1529 if request.granted_scopes.is_empty() {
1531 request
1532 .granted_scopes
1533 .extend(state.default_scopes.iter().cloned());
1534 }
1535
1536 if state.config.async_ingress
1537 && let Some(ingress) = &state.ingress
1538 {
1539 let envelope = rain_engine_ingress::IngressEventEnvelope {
1540 session_id: request.session_id.clone(),
1541 trigger: request.trigger,
1542 granted_scopes: request.granted_scopes,
1543 idempotency_key: request.idempotency_key,
1544 policy: Some(request.policy),
1545 provider: Some(request.provider),
1546 };
1547 ingress
1548 .publish(&envelope)
1549 .await
1550 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
1551
1552 let response_body = serde_json::json!({
1553 "session_id": request.session_id,
1554 "status": "accepted"
1555 });
1556 return Ok((StatusCode::ACCEPTED, axum::Json(response_body)).into_response());
1557 }
1558
1559 let timeout = Duration::from_millis(state.config.request_timeout_ms.max(1));
1560 match tokio::time::timeout(timeout, run_until_terminal_trace(&state.engine, request)).await {
1561 Ok(Ok(result)) => Ok(axum::Json(result).into_response()),
1562 Ok(Err(err)) => Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())),
1563 Err(_) => Err((StatusCode::REQUEST_TIMEOUT, "request timed out".to_string())),
1564 }
1565}
1566
1567pub async fn run_until_terminal_trace(
1568 engine: &AgentEngine,
1569 request: ProcessRequest,
1570) -> Result<RuntimeRunResult, EngineError> {
1571 let mut advances = Vec::new();
1572 let mut next = AdvanceRequest::Trigger(request.clone());
1573 loop {
1574 let result = engine.advance(next).await?;
1575 if let Some(outcome) = result.outcome.clone() {
1576 advances.push(result);
1577 return Ok(RuntimeRunResult { advances, outcome });
1578 }
1579 advances.push(result);
1580 next = AdvanceRequest::Continue(ContinueRequest {
1581 session_id: request.session_id.clone(),
1582 granted_scopes: request.granted_scopes.clone(),
1583 policy: request.policy.clone(),
1584 provider: request.provider.clone(),
1585 cancellation: request.cancellation.clone(),
1586 });
1587 }
1588}
1589
1590pub async fn run_until_terminal(
1591 engine: &AgentEngine,
1592 request: ProcessRequest,
1593) -> Result<EngineOutcome, EngineError> {
1594 Ok(run_until_terminal_trace(engine, request).await?.outcome)
1595}
1596
1597fn map_ingress_error(error: IngressError) -> (StatusCode, String) {
1598 match error {
1599 IngressError::UnsupportedContentType => {
1600 (StatusCode::UNSUPPORTED_MEDIA_TYPE, error.to_string())
1601 }
1602 IngressError::Malformed(_) => (StatusCode::BAD_REQUEST, error.to_string()),
1603 IngressError::Blob(_) => (StatusCode::INTERNAL_SERVER_ERROR, error.to_string()),
1604 }
1605}
1606
1607#[typeshare::typeshare]
1612#[derive(Debug, Clone, Serialize, Deserialize)]
1613pub struct HealthResponse {
1614 pub status: String,
1615 pub version: String,
1616}
1617
1618async fn handle_health() -> Json<HealthResponse> {
1619 Json(HealthResponse {
1620 status: "ok".to_string(),
1621 version: env!("CARGO_PKG_VERSION").to_string(),
1622 })
1623}
1624
1625async fn handle_capabilities(State(state): State<RuntimeState>) -> Json<RuntimeCapabilities> {
1626 Json(RuntimeCapabilities {
1627 version: env!("CARGO_PKG_VERSION").to_string(),
1628 provider_kind: state.provider_kind.clone(),
1629 default_model: state.config.default_provider.model.clone(),
1630 streaming: true,
1631 approvals: true,
1632 multipart_uploads: true,
1633 default_scopes: state.default_scopes.clone(),
1634 default_policy: state.config.default_policy.clone(),
1635 channels: state.channel_statuses.clone(),
1636 approval_metadata: ApprovalMetadataSupport {
1637 structured_json: true,
1638 recommended_fields: vec![
1639 "actor_id".to_string(),
1640 "client".to_string(),
1641 "reason".to_string(),
1642 "decided_at_ms".to_string(),
1643 ],
1644 },
1645 wake_support: true,
1646 delegation_support: true,
1647 learning_support: true,
1648 upload_limits: UploadSupport {
1649 multipart: true,
1650 max_request_bytes: MAX_INGRESS_BODY_BYTES,
1651 },
1652 skills: state.engine.skill_definitions().await,
1653 })
1654}
1655
1656async fn handle_get_settings(State(state): State<RuntimeState>) -> Json<RuntimeSettingsView> {
1657 Json(state.settings.to_view().await)
1658}
1659
1660async fn handle_update_settings(
1661 State(state): State<RuntimeState>,
1662 Json(request): Json<RuntimeSettingsUpdateRequest>,
1663) -> Json<RuntimeSettingsView> {
1664 state.settings.apply(request).await;
1665 Json(state.settings.to_view().await)
1666}
1667
1668#[typeshare::typeshare]
1669#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1670pub struct SessionListParams {
1671 #[serde(default)]
1672 pub offset: Option<usize>,
1673 #[serde(default)]
1674 pub limit: Option<usize>,
1675 #[serde(default)]
1676 pub since_ms: Option<i64>,
1677 #[serde(default)]
1678 pub until_ms: Option<i64>,
1679}
1680
1681async fn handle_list_sessions(
1682 State(state): State<RuntimeState>,
1683 Query(params): Query<SessionListParams>,
1684) -> Result<Json<Value>, (StatusCode, String)> {
1685 let query = SessionListQuery {
1686 offset: params.offset.unwrap_or(0),
1687 limit: params.limit.unwrap_or(100),
1688 since_ms: params.since_ms,
1689 until_ms: params.until_ms,
1690 };
1691 let sessions = state
1692 .memory
1693 .list_sessions(query)
1694 .await
1695 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.message))?;
1696 Ok(Json(serde_json::to_value(sessions).unwrap_or_default()))
1697}
1698
1699async fn handle_list_session_views(
1700 State(state): State<RuntimeState>,
1701 Query(params): Query<SessionListParams>,
1702) -> Result<Json<Vec<SessionListItemView>>, (StatusCode, String)> {
1703 let query = SessionListQuery {
1704 offset: params.offset.unwrap_or(0),
1705 limit: params.limit.unwrap_or(100),
1706 since_ms: params.since_ms,
1707 until_ms: params.until_ms,
1708 };
1709 let sessions = state
1710 .memory
1711 .list_sessions(query)
1712 .await
1713 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.message))?;
1714
1715 let mut views = Vec::with_capacity(sessions.len());
1716 for session in sessions {
1717 let snapshot = if let Ok(Some(cached)) = state
1718 .engine
1719 .state_cache()
1720 .get_projection(&session.session_id)
1721 .await
1722 {
1723 cached
1724 } else {
1725 state
1726 .memory
1727 .load_session(&session.session_id)
1728 .await
1729 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.message))?
1730 };
1731 views.push(build_session_list_item(snapshot));
1732 }
1733
1734 views.sort_by(|left, right| right.last_activity_at_ms.cmp(&left.last_activity_at_ms));
1735 Ok(Json(views))
1736}
1737
1738async fn handle_get_session(
1739 State(state): State<RuntimeState>,
1740 Path(session_id): Path<String>,
1741) -> Result<Json<Value>, (StatusCode, String)> {
1742 let snapshot =
1743 if let Ok(Some(cached)) = state.engine.state_cache().get_projection(&session_id).await {
1744 cached
1745 } else {
1746 state
1747 .memory
1748 .load_session(&session_id)
1749 .await
1750 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.message))?
1751 };
1752 Ok(Json(serde_json::to_value(snapshot).unwrap_or_default()))
1753}
1754
1755async fn handle_get_session_view(
1756 State(state): State<RuntimeState>,
1757 Path(session_id): Path<String>,
1758) -> Result<Json<SessionView>, (StatusCode, String)> {
1759 let snapshot =
1760 if let Ok(Some(cached)) = state.engine.state_cache().get_projection(&session_id).await {
1761 cached
1762 } else {
1763 state
1764 .memory
1765 .load_session(&session_id)
1766 .await
1767 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.message))?
1768 };
1769 Ok(Json(build_session_view(snapshot)))
1770}
1771
1772async fn handle_get_execution_graph(
1773 State(state): State<RuntimeState>,
1774 Path(session_id): Path<String>,
1775) -> Result<Json<ExecutionGraphView>, (StatusCode, String)> {
1776 let snapshot =
1777 if let Ok(Some(cached)) = state.engine.state_cache().get_projection(&session_id).await {
1778 cached
1779 } else {
1780 state
1781 .memory
1782 .load_session(&session_id)
1783 .await
1784 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.message))?
1785 };
1786 Ok(Json(build_execution_graph_view(&snapshot)))
1787}
1788
1789#[typeshare::typeshare]
1790#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1791pub struct RecordListParams {
1792 #[serde(default)]
1793 pub offset: Option<usize>,
1794 #[serde(default)]
1795 pub limit: Option<usize>,
1796 #[serde(default)]
1797 pub since_ms: Option<i64>,
1798 #[serde(default)]
1799 pub until_ms: Option<i64>,
1800}
1801
1802async fn handle_install_skill(
1803 State(state): State<RuntimeState>,
1804 Json(request): Json<InstallSkillRequest>,
1805) -> Result<StatusCode, (StatusCode, String)> {
1806 let direct_install_enabled = std::env::var("RAIN_ENABLE_DIRECT_SKILL_INSTALL")
1807 .map(|value| value == "true" || value == "1")
1808 .unwrap_or(false);
1809 if !direct_install_enabled {
1810 return Err((
1811 StatusCode::FORBIDDEN,
1812 "direct skill install is disabled; use operator tooling or set RAIN_ENABLE_DIRECT_SKILL_INSTALL=true"
1813 .to_string(),
1814 ));
1815 }
1816
1817 info!("Attempting to install skill: {}", request.manifest.name);
1818
1819 let wasm_bytes = if let Some(url) = &request.wasm_url {
1820 let client = reqwest::Client::new();
1821 client
1822 .get(url)
1823 .send()
1824 .await
1825 .map_err(|err| (StatusCode::BAD_GATEWAY, format!("Download failed: {}", err)))?
1826 .bytes()
1827 .await
1828 .map_err(|err| (StatusCode::BAD_GATEWAY, format!("Read failed: {}", err)))?
1829 .to_vec()
1830 } else if let Some(b64) = &request.wasm_base64 {
1831 use base64::{Engine as _, engine::general_purpose};
1832 general_purpose::STANDARD.decode(b64).map_err(|err| {
1833 (
1834 StatusCode::BAD_REQUEST,
1835 format!("Base64 decode failed: {}", err),
1836 )
1837 })?
1838 } else if let Some(path) = &request.file_path {
1839 tokio::fs::read(path).await.map_err(|err| {
1840 (
1841 StatusCode::BAD_REQUEST,
1842 format!("File read failed: {}", err),
1843 )
1844 })?
1845 } else {
1846 return Err((
1847 StatusCode::BAD_REQUEST,
1848 "Must provide wasm_url, wasm_base64, or file_path".to_string(),
1849 ));
1850 };
1851
1852 let config = rain_engine_wasm::WasmSkillConfig {
1853 manifest: request.manifest.clone(),
1854 wasm_bytes: Arc::new(wasm_bytes.clone()),
1855 capabilities: Arc::new(rain_engine_wasm::InMemoryCapabilityHost::new().with_http_client()),
1856 };
1857
1858 let executor = rain_engine_wasm::WasmSkillExecutor::new(config).map_err(|err| {
1859 (
1860 StatusCode::INTERNAL_SERVER_ERROR,
1861 format!("Init failed: {}", err),
1862 )
1863 })?;
1864 let executor = Arc::new(executor);
1865 state
1866 .engine
1867 .register_wasm_skill_persistent(request.manifest, executor.clone(), wasm_bytes)
1868 .await
1869 .map_err(|err| {
1870 (
1871 StatusCode::INTERNAL_SERVER_ERROR,
1872 format!("Storage failed: {err}"),
1873 )
1874 })?;
1875
1876 info!(
1877 "Skill successfully registered: {}",
1878 executor.manifest().name
1879 );
1880 Ok(StatusCode::CREATED)
1881}
1882
1883async fn handle_list_records(
1884 State(state): State<RuntimeState>,
1885 Path(session_id): Path<String>,
1886 Query(params): Query<RecordListParams>,
1887) -> Result<Json<Value>, (StatusCode, String)> {
1888 let query = RecordPageQuery {
1889 session_id,
1890 offset: params.offset.unwrap_or(0),
1891 limit: params.limit.unwrap_or(100),
1892 since_ms: params.since_ms,
1893 until_ms: params.until_ms,
1894 };
1895 let page = state
1896 .memory
1897 .list_records(query)
1898 .await
1899 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.message))?;
1900 Ok(Json(serde_json::to_value(page).unwrap_or_default()))
1901}
1902
1903fn build_session_view(snapshot: SessionSnapshot) -> SessionView {
1904 let state = snapshot.agent_state();
1905 let pending_approval = latest_pending_approval(&snapshot);
1906 let timeline = build_timeline(&snapshot.records);
1907 let tool_timeline = build_tool_timeline(&snapshot.records);
1908 let status = derive_session_status(&snapshot, pending_approval.as_ref());
1909 let activity_state = derive_activity_state(&snapshot, pending_approval.as_ref());
1910 let total_estimated_cost_usd = snapshot.total_estimated_cost_usd();
1911 let self_improvement = build_self_improvement_view(&snapshot);
1912 let execution_graph = build_execution_graph_view(&snapshot);
1913 let current_task = current_task(&state);
1914 let active_channel_ids = derive_active_channel_ids(&snapshot.records);
1915 let pending_wake = state.pending_wake.as_ref().map(pending_wake_view);
1916 let current_focus = derive_current_focus(
1917 &snapshot,
1918 pending_approval.as_ref(),
1919 activity_state.clone(),
1920 current_task.as_ref(),
1921 pending_wake.as_ref(),
1922 );
1923 let blocked_reason = derive_blocked_reason(
1924 pending_approval.as_ref(),
1925 current_task.as_ref(),
1926 pending_wake.as_ref(),
1927 );
1928
1929 SessionView {
1930 session_id: snapshot.session_id,
1931 status,
1932 activity_state,
1933 current_focus,
1934 current_task_id: current_task.as_ref().map(|task| task.task_id.0.clone()),
1935 current_task_title: current_task.as_ref().map(|task| task.title.clone()),
1936 next_wake_at_ms: pending_wake.as_ref().and_then(|wake| wake.due_at_ms),
1937 blocked_reason,
1938 last_human_input_at_ms: last_human_input_at_ms(&snapshot.records),
1939 last_assistant_activity_at_ms: last_assistant_activity_at_ms(&snapshot.records),
1940 active_channel_ids,
1941 pending_wake,
1942 wake_history: build_wake_history(&snapshot.records),
1943 last_heartbeat: build_last_heartbeat(&snapshot.records),
1944 last_sequence_no: snapshot.last_sequence_no,
1945 latest_outcome: snapshot.latest_outcome,
1946 pending_approval,
1947 state,
1948 timeline,
1949 tool_timeline,
1950 self_improvement,
1951 execution_graph,
1952 record_count: snapshot.records.len(),
1953 total_estimated_cost_usd,
1954 }
1955}
1956
1957fn build_session_list_item(snapshot: SessionSnapshot) -> SessionListItemView {
1958 let state = snapshot.agent_state();
1959 let pending_approval = latest_pending_approval(&snapshot);
1960 let status = derive_session_status(&snapshot, pending_approval.as_ref());
1961 let activity_state = derive_activity_state(&snapshot, pending_approval.as_ref());
1962 let pending_wake = state.pending_wake.as_ref().map(pending_wake_view);
1963 let current_task = current_task(&state);
1964 let current_focus = derive_current_focus(
1965 &snapshot,
1966 pending_approval.as_ref(),
1967 activity_state.clone(),
1968 current_task.as_ref(),
1969 pending_wake.as_ref(),
1970 );
1971
1972 SessionListItemView {
1973 session_id: snapshot.session_id.clone(),
1974 status,
1975 activity_state: activity_state.clone(),
1976 current_focus,
1977 latest_provider: latest_provider_name(&snapshot.records),
1978 last_activity_at_ms: last_activity_at_ms(&snapshot.records),
1979 pending_approval: pending_approval.is_some(),
1980 pending_wake: pending_wake.is_some(),
1981 unread_event_count: derive_unread_event_count(&snapshot.records),
1982 active_channel_ids: derive_active_channel_ids(&snapshot.records),
1983 record_count: snapshot.records.len(),
1984 }
1985}
1986
1987fn build_execution_graph_view(snapshot: &SessionSnapshot) -> ExecutionGraphView {
1988 let active_graph = snapshot.active_tool_execution_graph();
1989 let graphs = snapshot.tool_execution_graphs();
1990 let checkpoints = snapshot.tool_node_checkpoints();
1991 let validations = snapshot.skill_input_validations();
1992 let mut latest = BTreeMap::<String, rain_engine_core::ToolNodeStatus>::new();
1993 for checkpoint in checkpoints.iter().filter(|checkpoint| {
1994 active_graph
1995 .as_ref()
1996 .map(|graph| graph.graph_id == checkpoint.graph_id)
1997 .unwrap_or(false)
1998 }) {
1999 latest.insert(checkpoint.call_id.clone(), checkpoint.status.clone());
2000 }
2001 let blocked_call_ids = active_graph
2002 .as_ref()
2003 .map(|graph| {
2004 graph
2005 .nodes
2006 .iter()
2007 .filter(|node| {
2008 node.dependencies.iter().any(|dependency| {
2009 matches!(
2010 latest.get(&dependency.call_id),
2011 Some(rain_engine_core::ToolNodeStatus::Failed)
2012 | Some(rain_engine_core::ToolNodeStatus::Skipped)
2013 | Some(rain_engine_core::ToolNodeStatus::TimedOut)
2014 )
2015 })
2016 })
2017 .map(|node| node.call_id.clone())
2018 .collect()
2019 })
2020 .unwrap_or_default();
2021
2022 ExecutionGraphView {
2023 active_graph,
2024 graphs,
2025 checkpoints,
2026 validations,
2027 blocked_call_ids,
2028 }
2029}
2030
2031fn build_self_improvement_view(snapshot: &SessionSnapshot) -> SelfImprovementView {
2032 SelfImprovementView {
2033 active_overlay: snapshot.active_policy_overlay(),
2034 reflections: snapshot.reflections(),
2035 policy_tunings: snapshot.policy_tunings(),
2036 strategy_preferences: snapshot.strategy_preferences(),
2037 tool_performance: snapshot.tool_performance_records(),
2038 profile_patches: snapshot
2039 .records
2040 .iter()
2041 .filter_map(|record| match record {
2042 SessionRecord::ProfilePatch(patch) => Some(patch.clone()),
2043 _ => None,
2044 })
2045 .collect(),
2046 }
2047}
2048
2049fn derive_session_status(
2050 snapshot: &SessionSnapshot,
2051 pending_approval: Option<&ApprovalView>,
2052) -> SessionStatus {
2053 if snapshot.records.is_empty() {
2054 return SessionStatus::Empty;
2055 }
2056
2057 if pending_approval.is_some() {
2058 return SessionStatus::Suspended;
2059 }
2060
2061 match snapshot
2062 .latest_outcome
2063 .as_ref()
2064 .map(|outcome| &outcome.stop_reason)
2065 {
2066 None => SessionStatus::Running,
2067 Some(StopReason::Responded | StopReason::Yielded) => SessionStatus::Completed,
2068 Some(StopReason::Suspended) => SessionStatus::Suspended,
2069 Some(StopReason::Delegated) => SessionStatus::Delegated,
2070 Some(
2071 StopReason::ProviderFailure
2072 | StopReason::StorageFailure
2073 | StopReason::PolicyAborted
2074 | StopReason::DeadlineExceeded
2075 | StopReason::Cancelled,
2076 ) => SessionStatus::Failed,
2077 Some(StopReason::MaxStepsReached) => SessionStatus::Stopped,
2078 }
2079}
2080
2081fn derive_activity_state(
2082 snapshot: &SessionSnapshot,
2083 pending_approval: Option<&ApprovalView>,
2084) -> SessionActivityState {
2085 if pending_approval.is_some() {
2086 return SessionActivityState::WaitingHuman;
2087 }
2088
2089 if snapshot.active_tool_execution_graph().is_some() {
2090 return SessionActivityState::RunningTools;
2091 }
2092
2093 if snapshot
2094 .records
2095 .iter()
2096 .rev()
2097 .find_map(|record| match record {
2098 SessionRecord::Deliberation(_) => Some(SessionActivityState::Reasoning),
2099 SessionRecord::Delegation(_) => Some(SessionActivityState::Delegated),
2100 SessionRecord::KernelEvent(event) => match &event.event {
2101 rain_engine_core::KernelEvent::TaskBlocked { .. } => {
2102 Some(SessionActivityState::WaitingExternal)
2103 }
2104 rain_engine_core::KernelEvent::WakeRequested(_)
2105 | rain_engine_core::KernelEvent::WakeScheduled(_) => {
2106 Some(SessionActivityState::Scheduled)
2107 }
2108 _ => None,
2109 },
2110 SessionRecord::Outcome(outcome) => match outcome.stop_reason {
2111 StopReason::Delegated => Some(SessionActivityState::Delegated),
2112 StopReason::ProviderFailure
2113 | StopReason::StorageFailure
2114 | StopReason::PolicyAborted
2115 | StopReason::DeadlineExceeded
2116 | StopReason::Cancelled => Some(SessionActivityState::Errored),
2117 _ => None,
2118 },
2119 _ => None,
2120 })
2121 .is_some()
2122 {
2123 return snapshot
2124 .records
2125 .iter()
2126 .rev()
2127 .find_map(|record| match record {
2128 SessionRecord::Deliberation(_) => Some(SessionActivityState::Reasoning),
2129 SessionRecord::Delegation(_) => Some(SessionActivityState::Delegated),
2130 SessionRecord::KernelEvent(event) => match &event.event {
2131 rain_engine_core::KernelEvent::TaskBlocked { .. } => {
2132 Some(SessionActivityState::WaitingExternal)
2133 }
2134 rain_engine_core::KernelEvent::WakeRequested(_)
2135 | rain_engine_core::KernelEvent::WakeScheduled(_) => {
2136 Some(SessionActivityState::Scheduled)
2137 }
2138 _ => None,
2139 },
2140 SessionRecord::Outcome(outcome) => match outcome.stop_reason {
2141 StopReason::Delegated => Some(SessionActivityState::Delegated),
2142 StopReason::ProviderFailure
2143 | StopReason::StorageFailure
2144 | StopReason::PolicyAborted
2145 | StopReason::DeadlineExceeded
2146 | StopReason::Cancelled => Some(SessionActivityState::Errored),
2147 _ => None,
2148 },
2149 _ => None,
2150 })
2151 .unwrap_or(SessionActivityState::Idle);
2152 }
2153
2154 match snapshot
2155 .latest_outcome
2156 .as_ref()
2157 .map(|outcome| &outcome.stop_reason)
2158 {
2159 Some(StopReason::Delegated) => SessionActivityState::Delegated,
2160 Some(
2161 StopReason::ProviderFailure
2162 | StopReason::StorageFailure
2163 | StopReason::PolicyAborted
2164 | StopReason::DeadlineExceeded
2165 | StopReason::Cancelled,
2166 ) => SessionActivityState::Errored,
2167 _ if snapshot.agent_state().pending_wake.is_some() => SessionActivityState::Scheduled,
2168 _ => SessionActivityState::Idle,
2169 }
2170}
2171
2172fn current_task(state: &AgentStateSnapshot) -> Option<rain_engine_core::TaskRecord> {
2173 let priority_order = |status: &rain_engine_core::TaskStatus| match status {
2174 rain_engine_core::TaskStatus::Running => 0,
2175 rain_engine_core::TaskStatus::Ready => 1,
2176 rain_engine_core::TaskStatus::WaitingHuman => 2,
2177 rain_engine_core::TaskStatus::Blocked => 3,
2178 rain_engine_core::TaskStatus::Pending => 4,
2179 rain_engine_core::TaskStatus::Failed => 5,
2180 rain_engine_core::TaskStatus::Done => 6,
2181 rain_engine_core::TaskStatus::Abandoned => 7,
2182 };
2183
2184 state
2185 .tasks
2186 .iter()
2187 .filter(|task| {
2188 !matches!(
2189 task.status,
2190 rain_engine_core::TaskStatus::Done | rain_engine_core::TaskStatus::Abandoned
2191 )
2192 })
2193 .min_by_key(|task| (priority_order(&task.status), unix_time_ms(task.created_at)))
2194 .cloned()
2195}
2196
2197fn derive_current_focus(
2198 snapshot: &SessionSnapshot,
2199 pending_approval: Option<&ApprovalView>,
2200 activity_state: SessionActivityState,
2201 current_task: Option<&rain_engine_core::TaskRecord>,
2202 pending_wake: Option<&WakeView>,
2203) -> Option<String> {
2204 if let Some(approval) = pending_approval {
2205 return Some(approval.reason.clone());
2206 }
2207
2208 if let Some(task) = current_task {
2209 return Some(match task.status {
2210 rain_engine_core::TaskStatus::Running => format!("Working on {}", task.title),
2211 rain_engine_core::TaskStatus::Ready => format!("Ready to start {}", task.title),
2212 rain_engine_core::TaskStatus::WaitingHuman => {
2213 format!("Awaiting approval for {}", task.title)
2214 }
2215 rain_engine_core::TaskStatus::Blocked => format!("Blocked on {}", task.title),
2216 rain_engine_core::TaskStatus::Pending => format!("Queued {}", task.title),
2217 rain_engine_core::TaskStatus::Failed => format!("Recovering {}", task.title),
2218 rain_engine_core::TaskStatus::Done | rain_engine_core::TaskStatus::Abandoned => {
2219 task.title.clone()
2220 }
2221 });
2222 }
2223
2224 if let Some(wake) = pending_wake {
2225 return Some(format!("Scheduled wake: {}", wake.reason));
2226 }
2227
2228 if let Some(deliberation) = snapshot
2229 .records
2230 .iter()
2231 .rev()
2232 .find_map(|record| match record {
2233 SessionRecord::Deliberation(deliberation) => Some(deliberation.summary.clone()),
2234 _ => None,
2235 })
2236 {
2237 return Some(deliberation);
2238 }
2239
2240 if let Some(outcome) = snapshot.latest_outcome.as_ref() {
2241 return outcome
2242 .response
2243 .clone()
2244 .or_else(|| outcome.detail.clone())
2245 .map(|text| truncate_preview(&text));
2246 }
2247
2248 Some(match activity_state {
2249 SessionActivityState::RunningTools => "Executing tools".to_string(),
2250 SessionActivityState::Reasoning => "Evaluating next action".to_string(),
2251 SessionActivityState::WaitingExternal => "Waiting on external state".to_string(),
2252 SessionActivityState::Scheduled => "Waiting for scheduled wake".to_string(),
2253 SessionActivityState::Delegated => "Waiting on delegated work".to_string(),
2254 SessionActivityState::Errored => "Investigating a failed step".to_string(),
2255 SessionActivityState::WaitingHuman => "Waiting on a human decision".to_string(),
2256 SessionActivityState::Idle => "Idle until the next event".to_string(),
2257 })
2258}
2259
2260fn derive_blocked_reason(
2261 pending_approval: Option<&ApprovalView>,
2262 current_task: Option<&rain_engine_core::TaskRecord>,
2263 pending_wake: Option<&WakeView>,
2264) -> Option<String> {
2265 if let Some(approval) = pending_approval {
2266 return Some(approval.reason.clone());
2267 }
2268 if let Some(task) = current_task
2269 && matches!(
2270 task.status,
2271 rain_engine_core::TaskStatus::Blocked | rain_engine_core::TaskStatus::WaitingHuman
2272 )
2273 {
2274 return task.detail.clone().or_else(|| {
2275 if task.blocked_by.is_empty() {
2276 None
2277 } else {
2278 Some(format!("blocked by {}", task.blocked_by.len()))
2279 }
2280 });
2281 }
2282 pending_wake.map(|wake| wake.reason.clone())
2283}
2284
2285fn derive_active_channel_ids(records: &[SessionRecord]) -> Vec<String> {
2286 let mut channels = BTreeSet::new();
2287 for record in records {
2288 if let SessionRecord::Trigger(trigger) = record {
2289 match &trigger.trigger {
2290 AgentTrigger::HumanInput { actor_id, .. }
2291 | AgentTrigger::Message {
2292 user_id: actor_id, ..
2293 } => {
2294 if let Some((channel, _)) = actor_id.split_once(':') {
2295 channels.insert(channel.to_string());
2296 }
2297 }
2298 _ => {}
2299 }
2300 }
2301 }
2302 channels.into_iter().collect()
2303}
2304
2305fn last_human_input_at_ms(records: &[SessionRecord]) -> Option<i64> {
2306 records.iter().rev().find_map(|record| match record {
2307 SessionRecord::Trigger(trigger)
2308 if matches!(
2309 trigger.trigger,
2310 AgentTrigger::HumanInput { .. } | AgentTrigger::Message { .. }
2311 ) =>
2312 {
2313 Some(unix_time_ms(trigger.recorded_at))
2314 }
2315 _ => None,
2316 })
2317}
2318
2319fn last_assistant_activity_at_ms(records: &[SessionRecord]) -> Option<i64> {
2320 records.iter().rev().find_map(|record| match record {
2321 SessionRecord::Outcome(outcome) => Some(unix_time_ms(outcome.finished_at)),
2322 SessionRecord::ToolResult(result) => Some(unix_time_ms(result.finished_at)),
2323 SessionRecord::Deliberation(deliberation) => Some(unix_time_ms(deliberation.created_at)),
2324 _ => None,
2325 })
2326}
2327
2328fn last_activity_at_ms(records: &[SessionRecord]) -> i64 {
2329 records
2330 .iter()
2331 .rev()
2332 .find_map(record_occurred_at_ms)
2333 .unwrap_or_default()
2334}
2335
2336fn record_occurred_at_ms(record: &SessionRecord) -> Option<i64> {
2337 match record {
2338 SessionRecord::Trigger(trigger) => Some(unix_time_ms(trigger.recorded_at)),
2339 SessionRecord::TriggerIntent(intent) => Some(unix_time_ms(intent.classified_at)),
2340 SessionRecord::KernelEvent(event) => Some(unix_time_ms(event.occurred_at)),
2341 SessionRecord::ModelDecision(decision) => Some(unix_time_ms(decision.decided_at)),
2342 SessionRecord::Deliberation(deliberation) => Some(unix_time_ms(deliberation.created_at)),
2343 SessionRecord::ToolExecutionGraph(graph) => Some(unix_time_ms(graph.created_at)),
2344 SessionRecord::ToolNodeCheckpoint(checkpoint) => Some(unix_time_ms(checkpoint.occurred_at)),
2345 SessionRecord::SkillInputValidation(validation) => {
2346 Some(unix_time_ms(validation.validated_at))
2347 }
2348 SessionRecord::ToolCall(call) => Some(unix_time_ms(call.called_at)),
2349 SessionRecord::ToolResult(result) => Some(unix_time_ms(result.finished_at)),
2350 SessionRecord::PendingApproval(approval) => Some(unix_time_ms(approval.created_at)),
2351 SessionRecord::ApprovalResolution(resolution) => Some(unix_time_ms(resolution.resolved_at)),
2352 SessionRecord::Delegation(delegation) => Some(unix_time_ms(delegation.created_at)),
2353 SessionRecord::CoordinationClaim(claim) => Some(unix_time_ms(claim.claimed_at)),
2354 SessionRecord::ProviderUsage(usage) => Some(unix_time_ms(usage.recorded_at)),
2355 SessionRecord::ProviderCache(cache) => Some(unix_time_ms(cache.cached_at)),
2356 SessionRecord::Reflection(reflection) => Some(unix_time_ms(reflection.created_at)),
2357 SessionRecord::PolicyTuning(tuning) => Some(unix_time_ms(tuning.created_at)),
2358 SessionRecord::StrategyPreference(preference) => Some(unix_time_ms(preference.created_at)),
2359 SessionRecord::ToolPerformance(perf) => Some(unix_time_ms(perf.created_at)),
2360 SessionRecord::ProfilePatch(patch) => Some(unix_time_ms(patch.created_at)),
2361 SessionRecord::ExecutionPlan(plan) => Some(unix_time_ms(plan.created_at)),
2362 SessionRecord::Summary(summary) => Some(unix_time_ms(summary.created_at)),
2363 SessionRecord::Outcome(outcome) => Some(unix_time_ms(outcome.finished_at)),
2364 }
2365}
2366
2367fn latest_provider_name(records: &[SessionRecord]) -> Option<String> {
2368 records.iter().rev().find_map(|record| match record {
2369 SessionRecord::ProviderUsage(usage) => Some(usage.provider_name.clone()),
2370 _ => None,
2371 })
2372}
2373
2374fn derive_unread_event_count(records: &[SessionRecord]) -> usize {
2375 let last_human_at = last_human_input_at_ms(records).unwrap_or_default();
2376 records
2377 .iter()
2378 .filter(|record| {
2379 record_occurred_at_ms(record).is_some_and(|at| {
2380 at >= last_human_at
2381 && !matches!(
2382 record,
2383 SessionRecord::Trigger(trigger)
2384 if matches!(
2385 trigger.trigger,
2386 AgentTrigger::HumanInput { .. } | AgentTrigger::Message { .. }
2387 )
2388 )
2389 })
2390 })
2391 .count()
2392}
2393
2394fn pending_wake_view(wake: &rain_engine_core::WakeRequestRecord) -> WakeView {
2395 WakeView {
2396 wake_id: wake.wake_id.0.clone(),
2397 reason: wake.reason.clone(),
2398 status: "scheduled".to_string(),
2399 occurred_at_ms: unix_time_ms(wake.requested_at),
2400 due_at_ms: Some(unix_time_ms(wake.due_at)),
2401 task_id: wake.task_id.as_ref().map(|task_id| task_id.0.clone()),
2402 }
2403}
2404
2405fn build_wake_history(records: &[SessionRecord]) -> Vec<WakeView> {
2406 let mut wakes = Vec::new();
2407 for record in records.iter().rev() {
2408 match record {
2409 SessionRecord::KernelEvent(event) => match &event.event {
2410 rain_engine_core::KernelEvent::WakeRequested(wake)
2411 | rain_engine_core::KernelEvent::WakeScheduled(wake) => wakes.push(WakeView {
2412 wake_id: wake.wake_id.0.clone(),
2413 reason: wake.reason.clone(),
2414 status: "scheduled".to_string(),
2415 occurred_at_ms: unix_time_ms(event.occurred_at),
2416 due_at_ms: Some(unix_time_ms(wake.due_at)),
2417 task_id: wake.task_id.as_ref().map(|task_id| task_id.0.clone()),
2418 }),
2419 rain_engine_core::KernelEvent::WakeCompleted {
2420 wake_id, reason, ..
2421 } => {
2422 wakes.push(WakeView {
2423 wake_id: wake_id.0.clone(),
2424 reason: reason.clone(),
2425 status: "completed".to_string(),
2426 occurred_at_ms: unix_time_ms(event.occurred_at),
2427 due_at_ms: None,
2428 task_id: None,
2429 });
2430 }
2431 _ => {}
2432 },
2433 SessionRecord::Trigger(trigger)
2434 if matches!(trigger.trigger, AgentTrigger::ScheduledWake { .. }) =>
2435 {
2436 if let AgentTrigger::ScheduledWake {
2437 wake_id,
2438 due_at,
2439 reason,
2440 } = &trigger.trigger
2441 {
2442 wakes.push(WakeView {
2443 wake_id: wake_id.0.clone(),
2444 reason: reason.clone(),
2445 status: "fired".to_string(),
2446 occurred_at_ms: unix_time_ms(trigger.recorded_at),
2447 due_at_ms: Some(unix_time_ms(*due_at)),
2448 task_id: None,
2449 });
2450 }
2451 }
2452 _ => {}
2453 }
2454 if wakes.len() >= 12 {
2455 break;
2456 }
2457 }
2458 wakes
2459}
2460
2461fn build_last_heartbeat(records: &[SessionRecord]) -> Option<HeartbeatStatusView> {
2462 let mut latest_wake: Option<(String, String, i64)> = None;
2463 for record in records.iter().rev() {
2464 if let SessionRecord::Trigger(trigger) = record
2465 && let AgentTrigger::ScheduledWake {
2466 wake_id, reason, ..
2467 } = &trigger.trigger
2468 && reason.to_ascii_lowercase().contains("heartbeat")
2469 {
2470 latest_wake = Some((
2471 wake_id.0.clone(),
2472 reason.clone(),
2473 unix_time_ms(trigger.recorded_at),
2474 ));
2475 break;
2476 }
2477 }
2478
2479 let (wake_id, reason, occurred_at_ms) = latest_wake?;
2480 let outcome = records.iter().rev().find_map(|record| match record {
2481 SessionRecord::Outcome(outcome) if unix_time_ms(outcome.finished_at) >= occurred_at_ms => {
2482 Some(outcome)
2483 }
2484 _ => None,
2485 });
2486
2487 Some(HeartbeatStatusView {
2488 wake_id,
2489 reason,
2490 occurred_at_ms,
2491 outcome_summary: outcome
2492 .and_then(|outcome| outcome.response.clone().or_else(|| outcome.detail.clone()))
2493 .map(|text| truncate_preview(&text)),
2494 stop_reason: outcome.map(|outcome| outcome.stop_reason.clone()),
2495 })
2496}
2497
2498fn latest_pending_approval(snapshot: &SessionSnapshot) -> Option<ApprovalView> {
2499 let resolved = snapshot
2500 .records
2501 .iter()
2502 .filter_map(|record| match record {
2503 SessionRecord::ApprovalResolution(resolution) => {
2504 Some(resolution.resume_token.0.clone())
2505 }
2506 _ => None,
2507 })
2508 .collect::<BTreeSet<_>>();
2509
2510 let target_token = snapshot
2511 .latest_outcome
2512 .as_ref()
2513 .filter(|outcome| outcome.stop_reason == StopReason::Suspended)
2514 .and_then(|outcome| outcome.resume_token.as_ref())
2515 .map(|token| token.0.as_str());
2516
2517 snapshot
2518 .records
2519 .iter()
2520 .rev()
2521 .find_map(|record| match record {
2522 SessionRecord::PendingApproval(approval)
2523 if !resolved.contains(&approval.resume_token.0)
2524 && target_token
2525 .map(|token| token == approval.resume_token.0.as_str())
2526 .unwrap_or(true) =>
2527 {
2528 Some(approval_view(approval))
2529 }
2530 _ => None,
2531 })
2532}
2533
2534fn approval_view(record: &PendingApprovalRecord) -> ApprovalView {
2535 ApprovalView {
2536 resume_token: record.resume_token.0.clone(),
2537 created_at_ms: unix_time_ms(record.created_at),
2538 trigger_id: record.trigger_id.clone(),
2539 step: record.step,
2540 reason: match &record.reason {
2541 rain_engine_core::SuspendReason::HumanApprovalRequired { skill_names } => {
2542 format!("Human approval required for {}", skill_names.join(", "))
2543 }
2544 rain_engine_core::SuspendReason::ProviderRequested { message } => message.clone(),
2545 },
2546 pending_calls: record.pending_calls.clone(),
2547 }
2548}
2549
2550fn build_timeline(records: &[SessionRecord]) -> Vec<TimelineItem> {
2551 records
2552 .iter()
2553 .filter_map(|record| match record {
2554 SessionRecord::Trigger(trigger) => match &trigger.trigger {
2555 AgentTrigger::HumanInput {
2556 actor_id, content, ..
2557 } => Some(TimelineItem::HumanInput {
2558 actor_id: actor_id.clone(),
2559 content: content.clone(),
2560 occurred_at_ms: unix_time_ms(trigger.recorded_at),
2561 }),
2562 AgentTrigger::ExternalEvent { source, .. }
2563 | AgentTrigger::Webhook { source, .. }
2564 | AgentTrigger::SystemObservation { source, .. } => Some(TimelineItem::System {
2565 label: "event received".to_string(),
2566 detail: source.clone(),
2567 occurred_at_ms: unix_time_ms(trigger.recorded_at),
2568 }),
2569 AgentTrigger::ScheduledWake { reason, .. } => Some(TimelineItem::System {
2570 label: "scheduled wake".to_string(),
2571 detail: reason.clone(),
2572 occurred_at_ms: unix_time_ms(trigger.recorded_at),
2573 }),
2574 AgentTrigger::Approval { decision, .. } => Some(TimelineItem::System {
2575 label: "approval submitted".to_string(),
2576 detail: format!("{decision:?}"),
2577 occurred_at_ms: unix_time_ms(trigger.recorded_at),
2578 }),
2579 AgentTrigger::DelegationResult { correlation_id, .. } => {
2580 Some(TimelineItem::System {
2581 label: "delegation result".to_string(),
2582 detail: correlation_id.0.clone(),
2583 occurred_at_ms: unix_time_ms(trigger.recorded_at),
2584 })
2585 }
2586 AgentTrigger::RuleTrigger { rule_id, .. } => Some(TimelineItem::System {
2587 label: "rule trigger".to_string(),
2588 detail: rule_id.clone(),
2589 occurred_at_ms: unix_time_ms(trigger.recorded_at),
2590 }),
2591 AgentTrigger::ProactiveHeartbeat { .. } => Some(TimelineItem::System {
2592 label: "heartbeat".to_string(),
2593 detail: "runtime wake".to_string(),
2594 occurred_at_ms: unix_time_ms(trigger.recorded_at),
2595 }),
2596 AgentTrigger::Message {
2597 user_id, content, ..
2598 } => Some(TimelineItem::HumanInput {
2599 actor_id: user_id.clone(),
2600 content: content.clone(),
2601 occurred_at_ms: unix_time_ms(trigger.recorded_at),
2602 }),
2603 },
2604 SessionRecord::TriggerIntent(intent) => Some(TimelineItem::System {
2605 label: "intent classified".to_string(),
2606 detail: intent.intent.clone(),
2607 occurred_at_ms: unix_time_ms(intent.classified_at),
2608 }),
2609 SessionRecord::KernelEvent(event) => match &event.event {
2610 rain_engine_core::KernelEvent::WakeRequested(wake)
2611 | rain_engine_core::KernelEvent::WakeScheduled(wake) => {
2612 Some(TimelineItem::System {
2613 label: "wake scheduled".to_string(),
2614 detail: format!("{} · {}", wake.wake_id.0, wake.reason),
2615 occurred_at_ms: unix_time_ms(event.occurred_at),
2616 })
2617 }
2618 rain_engine_core::KernelEvent::WakeCompleted {
2619 wake_id, reason, ..
2620 } => Some(TimelineItem::System {
2621 label: "wake completed".to_string(),
2622 detail: format!("{}: {}", wake_id.0, reason),
2623 occurred_at_ms: unix_time_ms(event.occurred_at),
2624 }),
2625 _ => None,
2626 },
2627 SessionRecord::Outcome(outcome) => outcome
2628 .response
2629 .clone()
2630 .or_else(|| outcome.detail.clone())
2631 .map(|content| TimelineItem::AssistantResponse {
2632 content,
2633 stop_reason: outcome.stop_reason.clone(),
2634 occurred_at_ms: unix_time_ms(outcome.finished_at),
2635 }),
2636 SessionRecord::ToolCall(call) => Some(TimelineItem::ToolCall {
2637 call_id: call.call_id.clone(),
2638 skill_name: call.skill_name.clone(),
2639 formatted_call: format_tool_call(&call.skill_name, &call.args),
2640 occurred_at_ms: unix_time_ms(call.called_at),
2641 }),
2642 SessionRecord::ToolResult(result) => {
2643 let (success, preview) = tool_result_preview(result);
2644 Some(TimelineItem::ToolResult {
2645 call_id: result.call_id.clone(),
2646 skill_name: result.skill_name.clone(),
2647 success,
2648 preview,
2649 occurred_at_ms: unix_time_ms(result.finished_at),
2650 })
2651 }
2652 SessionRecord::PendingApproval(approval) => Some(TimelineItem::ApprovalRequested {
2653 resume_token: approval.resume_token.0.clone(),
2654 pending_calls: approval.pending_calls.clone(),
2655 occurred_at_ms: unix_time_ms(approval.created_at),
2656 }),
2657 SessionRecord::ApprovalResolution(resolution) => Some(TimelineItem::ApprovalResolved {
2658 resume_token: resolution.resume_token.0.clone(),
2659 decision: resolution.decision.clone(),
2660 occurred_at_ms: unix_time_ms(resolution.resolved_at),
2661 }),
2662 SessionRecord::Deliberation(deliberation) => Some(TimelineItem::Plan {
2663 summary: deliberation.summary.clone(),
2664 candidate_actions: deliberation.candidate_actions.clone(),
2665 confidence: deliberation.confidence,
2666 outcome: deliberation.outcome.clone(),
2667 occurred_at_ms: unix_time_ms(deliberation.created_at),
2668 }),
2669 SessionRecord::ToolNodeCheckpoint(checkpoint) => Some(TimelineItem::ToolCheckpoint {
2670 call_id: checkpoint.call_id.clone(),
2671 skill_name: checkpoint.skill_name.clone(),
2672 status: checkpoint.status.clone(),
2673 attempt: checkpoint.attempt,
2674 detail: checkpoint.detail.clone(),
2675 occurred_at_ms: unix_time_ms(checkpoint.occurred_at),
2676 }),
2677 SessionRecord::SkillInputValidation(validation) if !validation.valid => {
2678 Some(TimelineItem::ValidationFailure {
2679 call_id: validation.call_id.clone(),
2680 skill_name: validation.skill_name.clone(),
2681 errors: validation.errors.clone(),
2682 occurred_at_ms: unix_time_ms(validation.validated_at),
2683 })
2684 }
2685 SessionRecord::Reflection(reflection) => Some(TimelineItem::Learning {
2686 label: "reflection".to_string(),
2687 detail: reflection.summary.clone(),
2688 confidence: reflection.confidence,
2689 occurred_at_ms: unix_time_ms(reflection.created_at),
2690 }),
2691 SessionRecord::PolicyTuning(tuning) => Some(TimelineItem::Learning {
2692 label: format!("policy {:?}", tuning.action).to_ascii_lowercase(),
2693 detail: tuning.overlay.reason.clone(),
2694 confidence: tuning.overlay.confidence,
2695 occurred_at_ms: unix_time_ms(tuning.created_at),
2696 }),
2697 SessionRecord::StrategyPreference(preference) => Some(TimelineItem::Learning {
2698 label: "strategy preference".to_string(),
2699 detail: preference.reason.clone(),
2700 confidence: preference.confidence,
2701 occurred_at_ms: unix_time_ms(preference.created_at),
2702 }),
2703 _ => None,
2704 })
2705 .collect()
2706}
2707
2708fn build_tool_timeline(records: &[SessionRecord]) -> Vec<ToolTimelineItem> {
2709 let results = records
2710 .iter()
2711 .filter_map(|record| match record {
2712 SessionRecord::ToolResult(result) => Some((result.call_id.clone(), result)),
2713 _ => None,
2714 })
2715 .collect::<BTreeMap<_, _>>();
2716
2717 records
2718 .iter()
2719 .filter_map(|record| match record {
2720 SessionRecord::ToolCall(call) => Some(tool_timeline_item(
2721 call,
2722 results.get(&call.call_id).copied(),
2723 )),
2724 _ => None,
2725 })
2726 .collect()
2727}
2728
2729fn tool_timeline_item(
2730 call: &ToolCallRecord,
2731 result: Option<&ToolResultRecord>,
2732) -> ToolTimelineItem {
2733 let (success, output_preview, failure_kind, finished_at_ms) = match result {
2734 Some(result) => {
2735 let (success, preview) = tool_result_preview(result);
2736 let failure_kind = match &result.output {
2737 Ok(_) => None,
2738 Err(error) => Some(format!("{:?}", error.kind)),
2739 };
2740 (
2741 Some(success),
2742 Some(preview),
2743 failure_kind,
2744 Some(unix_time_ms(result.finished_at)),
2745 )
2746 }
2747 None => (None, None, None, None),
2748 };
2749
2750 ToolTimelineItem {
2751 call_id: call.call_id.clone(),
2752 skill_name: call.skill_name.clone(),
2753 step: call.step,
2754 called_at_ms: unix_time_ms(call.called_at),
2755 finished_at_ms,
2756 backend_kind: format!("{:?}", call.backend_kind),
2757 args: call.args.clone(),
2758 success,
2759 output_preview,
2760 failure_kind,
2761 }
2762}
2763
2764fn tool_result_preview(result: &ToolResultRecord) -> (bool, String) {
2765 match &result.output {
2766 Ok(value) => (true, preview_value(value)),
2767 Err(error) => (false, error.message.clone()),
2768 }
2769}
2770
2771fn preview_value(value: &Value) -> String {
2772 if let Some(stdout) = value.get("stdout").and_then(Value::as_str) {
2773 return truncate_preview(stdout);
2774 }
2775 if let Some(content) = value.get("content").and_then(Value::as_str) {
2776 return truncate_preview(content);
2777 }
2778 if let Some(text) = value.as_str() {
2779 return truncate_preview(text);
2780 }
2781 truncate_preview(&serde_json::to_string(value).unwrap_or_else(|_| "<unprintable>".to_string()))
2782}
2783
2784fn truncate_preview(value: &str) -> String {
2785 const MAX_PREVIEW_CHARS: usize = 240;
2786 let mut preview = value
2787 .trim()
2788 .chars()
2789 .take(MAX_PREVIEW_CHARS)
2790 .collect::<String>();
2791 if value.trim().chars().count() > MAX_PREVIEW_CHARS {
2792 preview.push('…');
2793 }
2794 preview
2795}
2796
2797fn format_tool_call(name: &str, args: &Value) -> String {
2798 match args {
2799 Value::Object(map) if map.is_empty() => format!("{name}()"),
2800 Value::Object(map) => {
2801 let rendered = map
2802 .iter()
2803 .map(|(key, value)| format!("{key}: {}", preview_value(value)))
2804 .collect::<Vec<_>>()
2805 .join(", ");
2806 format!("{name}({rendered})")
2807 }
2808 other => format!("{name}({})", preview_value(other)),
2809 }
2810}
2811
2812async fn handle_sse_stream(
2817 State(state): State<RuntimeState>,
2818 Path(session_id): Path<String>,
2819) -> Sse<impl futures_util::Stream<Item = Result<Event, std::convert::Infallible>>> {
2820 let memory = state.memory.clone();
2821 let mut last_seq: Option<i64> = None;
2822
2823 let stream = async_stream::stream! {
2824 loop {
2825 let snapshot = match memory.load_session(&session_id).await {
2826 Ok(snap) => snap,
2827 Err(_) => {
2828 tokio::time::sleep(Duration::from_secs(2)).await;
2829 continue;
2830 }
2831 };
2832
2833 let current_seq = snapshot.last_sequence_no;
2834 if current_seq != last_seq {
2835 last_seq = current_seq;
2836 let session_view = build_session_view(snapshot.clone());
2837 if let Ok(json) = serde_json::to_string(&session_view) {
2838 yield Ok(Event::default().event("session_view").data(json));
2839 }
2840 if let Ok(json) = serde_json::to_string(&session_view.execution_graph) {
2841 yield Ok(Event::default().event("execution_graph").data(json));
2842 }
2843 if let Ok(json) = serde_json::to_string(&session_view.self_improvement) {
2844 yield Ok(Event::default().event("learning").data(json));
2845 }
2846 if let Ok(json) = serde_json::to_string(&session_view.pending_approval) {
2847 yield Ok(Event::default().event("approval").data(json));
2848 }
2849 if let Ok(json) = serde_json::to_string(&session_view.wake_history) {
2850 yield Ok(Event::default().event("wake").data(json));
2851 }
2852 if let Ok(json) = serde_json::to_string(&snapshot.records) {
2853 yield Ok(Event::default().event("records").data(json));
2854 }
2855 }
2856
2857 tokio::time::sleep(Duration::from_millis(500)).await;
2858 }
2859 };
2860
2861 Sse::new(stream).keep_alive(KeepAlive::default())
2862}
2863
2864#[cfg(test)]
2865mod tests {
2866 use super::*;
2867 use async_trait::async_trait;
2868 use axum::body::Body;
2869 use axum::http::Request;
2870 use rain_engine_blob::LocalFileBlobStore;
2871 use rain_engine_core::{
2872 AgentAction, NativeSkill, PlannedSkillCall, SessionRecord, SkillExecutionError,
2873 SkillInvocation, SkillManifest, StopReason,
2874 };
2875 use serde_json::json;
2876 use tower::ServiceExt;
2877
2878 #[derive(Clone)]
2879 struct ApprovalNativeSkill;
2880
2881 #[async_trait]
2882 impl NativeSkill for ApprovalNativeSkill {
2883 async fn execute(
2884 &self,
2885 invocation: SkillInvocation,
2886 ) -> Result<serde_json::Value, SkillExecutionError> {
2887 Ok(json!({"approved": invocation.args}))
2888 }
2889
2890 fn requires_human_approval(&self) -> bool {
2891 true
2892 }
2893 }
2894
2895 fn server_config() -> RuntimeServerConfig {
2896 RuntimeServerConfig {
2897 bind_address: "127.0.0.1:0".parse().expect("addr"),
2898 request_timeout_ms: 1_000,
2899 default_policy: EnginePolicy::default(),
2900 allow_policy_overrides: true,
2901 allow_provider_overrides: true,
2902 default_provider: ProviderRequestConfig::default(),
2903 async_ingress: false,
2904 }
2905 }
2906
2907 fn runtime_state_with_mock(response: &str) -> RuntimeState {
2908 let memory: Arc<dyn MemoryStore> = Arc::new(InMemoryMemoryStore::new());
2909 let blob_store: Arc<dyn BlobStore> =
2910 Arc::from(build_blob_store(&BlobBootstrapConfig::InMemory).expect("blob store"));
2911 let llm: Arc<dyn LlmProvider> =
2912 Arc::new(MockLlmProvider::scripted(vec![AgentAction::Respond {
2913 content: response.to_string(),
2914 }]));
2915 let config = server_config();
2916 let settings = RuntimeMutableSettings::defaults(
2917 config.default_policy.clone(),
2918 config.default_provider.clone(),
2919 );
2920 RuntimeState::new(
2921 AgentEngine::new(llm, memory.clone()),
2922 memory,
2923 blob_store,
2924 config,
2925 settings,
2926 )
2927 }
2928
2929 #[tokio::test]
2930 async fn webhook_route_converts_json_request_into_agent_trigger() {
2931 let state = runtime_state_with_mock("processed");
2932
2933 let response = app(state)
2934 .oneshot(
2935 Request::post("/triggers/webhook/github")
2936 .header("content-type", "application/json")
2937 .body(Body::from(
2938 serde_json::to_vec(&WebhookIngressRequest {
2939 session_id: "runtime-session".to_string(),
2940 payload: serde_json::json!({"action": "opened"}),
2941 attachments: Vec::new(),
2942 granted_scopes: BTreeSet::new(),
2943 idempotency_key: Some("abc".to_string()),
2944 provider: None,
2945 policy_override: None,
2946 })
2947 .expect("request json"),
2948 ))
2949 .expect("request"),
2950 )
2951 .await
2952 .expect("response");
2953
2954 assert_eq!(response.status(), StatusCode::OK);
2955 let bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
2956 .await
2957 .expect("body");
2958 let run: RuntimeRunResult = serde_json::from_slice(&bytes).expect("run result");
2959 assert_eq!(run.outcome.stop_reason, StopReason::Responded);
2960 assert_eq!(run.outcome.response.as_deref(), Some("processed"));
2961 assert!(!run.advances.is_empty());
2962 }
2963
2964 #[tokio::test]
2965 async fn capabilities_route_exposes_runtime_surface() {
2966 let state = runtime_state_with_mock("processed");
2967
2968 let response = app(state)
2969 .oneshot(
2970 Request::get("/capabilities")
2971 .body(Body::empty())
2972 .expect("request"),
2973 )
2974 .await
2975 .expect("response");
2976
2977 assert_eq!(response.status(), StatusCode::OK);
2978 let bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
2979 .await
2980 .expect("body");
2981 let capabilities: RuntimeCapabilities =
2982 serde_json::from_slice(&bytes).expect("capabilities");
2983 assert!(capabilities.streaming);
2984 assert!(capabilities.approvals);
2985 assert!(capabilities.wake_support);
2986 assert!(capabilities.learning_support);
2987 assert!(
2988 capabilities
2989 .default_scopes
2990 .contains(&"tool:run".to_string())
2991 );
2992 }
2993
2994 #[tokio::test]
2995 async fn session_view_projects_records_for_control_room() {
2996 let state = runtime_state_with_mock("processed");
2997 let router = app(state.clone());
2998
2999 let response = router
3000 .oneshot(
3001 Request::post("/triggers/webhook/github")
3002 .header("content-type", "application/json")
3003 .body(Body::from(
3004 serde_json::to_vec(&WebhookIngressRequest {
3005 session_id: "view-session".to_string(),
3006 payload: serde_json::json!({"action": "opened"}),
3007 attachments: Vec::new(),
3008 granted_scopes: BTreeSet::new(),
3009 idempotency_key: None,
3010 provider: None,
3011 policy_override: None,
3012 })
3013 .expect("request json"),
3014 ))
3015 .expect("request"),
3016 )
3017 .await
3018 .expect("response");
3019 assert_eq!(response.status(), StatusCode::OK);
3020
3021 let view_response = app(state)
3022 .oneshot(
3023 Request::get("/sessions/view-session/view")
3024 .body(Body::empty())
3025 .expect("request"),
3026 )
3027 .await
3028 .expect("response");
3029
3030 assert_eq!(view_response.status(), StatusCode::OK);
3031 let bytes = axum::body::to_bytes(view_response.into_body(), usize::MAX)
3032 .await
3033 .expect("body");
3034 let view: SessionView = serde_json::from_slice(&bytes).expect("session view");
3035 assert_eq!(view.status, SessionStatus::Completed);
3036 assert_eq!(view.activity_state, SessionActivityState::Idle);
3037 assert!(view.record_count >= 3);
3038 assert!(view.current_focus.is_some());
3039 assert!(
3040 view.timeline
3041 .iter()
3042 .any(|item| matches!(item, TimelineItem::AssistantResponse { .. }))
3043 );
3044 assert!(!view.self_improvement.reflections.is_empty());
3045 }
3046
3047 #[tokio::test]
3048 async fn session_list_views_include_presence_metadata() {
3049 let state = runtime_state_with_mock("processed");
3050 let router = app(state.clone());
3051
3052 let response = router
3053 .oneshot(
3054 Request::post("/triggers/human/telegram:42")
3055 .header("content-type", "application/json")
3056 .body(Body::from(
3057 serde_json::to_vec(&HumanInputIngressRequest {
3058 session_id: "presence-session".to_string(),
3059 content: "check status".to_string(),
3060 attachments: Vec::new(),
3061 granted_scopes: BTreeSet::new(),
3062 idempotency_key: None,
3063 provider: None,
3064 policy_override: None,
3065 })
3066 .expect("request json"),
3067 ))
3068 .expect("request"),
3069 )
3070 .await
3071 .expect("response");
3072 assert_eq!(response.status(), StatusCode::OK);
3073
3074 let list_response = app(state)
3075 .oneshot(
3076 Request::get("/sessions/views")
3077 .body(Body::empty())
3078 .expect("request"),
3079 )
3080 .await
3081 .expect("response");
3082
3083 assert_eq!(list_response.status(), StatusCode::OK);
3084 let bytes = axum::body::to_bytes(list_response.into_body(), usize::MAX)
3085 .await
3086 .expect("body");
3087 let views: Vec<SessionListItemView> =
3088 serde_json::from_slice(&bytes).expect("session list views");
3089 let view = views
3090 .iter()
3091 .find(|session| session.session_id == "presence-session")
3092 .expect("presence session view");
3093 assert_eq!(view.status, SessionStatus::Completed);
3094 assert_eq!(view.activity_state, SessionActivityState::Idle);
3095 assert!(view.current_focus.is_some());
3096 assert_eq!(view.active_channel_ids, vec!["telegram".to_string()]);
3097 assert!(view.record_count >= 3);
3098 }
3099
3100 #[tokio::test]
3101 async fn multipart_webhook_persists_blob_attachment_references() {
3102 let memory: Arc<dyn MemoryStore> = Arc::new(InMemoryMemoryStore::new());
3103 let blob_store: Arc<dyn BlobStore> =
3104 Arc::from(build_blob_store(&BlobBootstrapConfig::InMemory).expect("blob store"));
3105 let llm: Arc<dyn LlmProvider> =
3106 Arc::new(MockLlmProvider::scripted(vec![AgentAction::Respond {
3107 content: "processed".to_string(),
3108 }]));
3109 let mut config = server_config();
3110 config.default_policy.max_inline_attachment_bytes = 4;
3111 let settings = RuntimeMutableSettings::defaults(
3112 config.default_policy.clone(),
3113 config.default_provider.clone(),
3114 );
3115 let state = RuntimeState::new(
3116 AgentEngine::new(llm, memory.clone()),
3117 memory.clone(),
3118 blob_store,
3119 config,
3120 settings,
3121 );
3122
3123 let boundary = "rain-engine-boundary";
3124 let body = format!(
3125 "--{boundary}\r\nContent-Disposition: form-data; name=\"session_id\"\r\n\r\nmulti-session\r\n\
3126--{boundary}\r\nContent-Disposition: form-data; name=\"payload\"\r\n\r\n{{\"event\":\"upload\"}}\r\n\
3127--{boundary}\r\nContent-Disposition: form-data; name=\"attachment\"; filename=\"schema.png\"\r\nContent-Type: image/png\r\n\r\n123456789\r\n\
3128--{boundary}--\r\n"
3129 );
3130
3131 let response = app(state.clone())
3132 .oneshot(
3133 Request::post("/triggers/webhook/files")
3134 .header(
3135 "content-type",
3136 format!("multipart/form-data; boundary={boundary}"),
3137 )
3138 .body(Body::from(body.into_bytes()))
3139 .expect("request"),
3140 )
3141 .await
3142 .expect("response");
3143
3144 assert_eq!(response.status(), StatusCode::OK);
3145 let snapshot = state
3146 .memory()
3147 .load_session("multi-session")
3148 .await
3149 .expect("session");
3150 let trigger = snapshot
3151 .records
3152 .into_iter()
3153 .find_map(|record| match record {
3154 SessionRecord::Trigger(record) => Some(record.trigger),
3155 _ => None,
3156 })
3157 .expect("trigger");
3158 match trigger {
3159 AgentTrigger::Webhook { attachments, .. } => {
3160 assert_eq!(attachments.len(), 1);
3161 assert!(matches!(
3162 attachments[0].content,
3163 rain_engine_core::AttachmentContent::Blob { .. }
3164 ));
3165 }
3166 other => panic!("unexpected trigger: {other:?}"),
3167 }
3168 }
3169
3170 #[tokio::test]
3171 async fn approval_route_resumes_suspended_native_skill() {
3172 let memory: Arc<dyn MemoryStore> = Arc::new(InMemoryMemoryStore::new());
3173 let blob_store: Arc<dyn BlobStore> =
3174 Arc::from(build_blob_store(&BlobBootstrapConfig::InMemory).expect("blob store"));
3175 let llm: Arc<dyn LlmProvider> = Arc::new(MockLlmProvider::scripted(vec![
3176 AgentAction::CallSkills(vec![PlannedSkillCall {
3177 call_id: "native-call".to_string(),
3178 name: "dangerous_native".to_string(),
3179 args: json!({"apply": true}),
3180 priority: 0,
3181 depends_on: Vec::new(),
3182 retry_policy: Default::default(),
3183 dry_run: false,
3184 }]),
3185 AgentAction::Respond {
3186 content: "completed".to_string(),
3187 },
3188 ]));
3189 let config = server_config();
3190 let settings = RuntimeMutableSettings::defaults(
3191 config.default_policy.clone(),
3192 config.default_provider.clone(),
3193 );
3194 let state = RuntimeState::new(
3195 AgentEngine::new(llm, memory.clone()),
3196 memory,
3197 blob_store,
3198 config,
3199 settings,
3200 );
3201 state.engine().register_native_skill(
3202 SkillManifest {
3203 name: "dangerous_native".to_string(),
3204 description: "Requires approval".to_string(),
3205 input_schema: json!({"type":"object"}),
3206 required_scopes: vec!["tool:run".to_string()],
3207 capability_grants: vec![],
3208 resource_policy: rain_engine_core::ResourcePolicy::default_for_tools(),
3209 approval_required: true,
3210 circuit_breaker_threshold: 0.5,
3211 },
3212 Arc::new(ApprovalNativeSkill),
3213 );
3214
3215 let start = app(state.clone())
3216 .oneshot(
3217 Request::post("/triggers/webhook/github")
3218 .header("content-type", "application/json")
3219 .body(Body::from(
3220 serde_json::to_vec(&WebhookIngressRequest {
3221 session_id: "approval-session".to_string(),
3222 payload: json!({"action": "deploy"}),
3223 attachments: Vec::new(),
3224 granted_scopes: BTreeSet::from(["tool:run".to_string()]),
3225 idempotency_key: None,
3226 provider: None,
3227 policy_override: None,
3228 })
3229 .expect("request json"),
3230 ))
3231 .expect("request"),
3232 )
3233 .await
3234 .expect("response");
3235
3236 let start_bytes = axum::body::to_bytes(start.into_body(), usize::MAX)
3237 .await
3238 .expect("body");
3239 let suspended: RuntimeRunResult = serde_json::from_slice(&start_bytes).expect("outcome");
3240 assert_eq!(suspended.outcome.stop_reason, StopReason::Suspended);
3241 let resume_token = suspended.outcome.resume_token.expect("resume token").0;
3242
3243 let resume = app(state)
3244 .oneshot(
3245 Request::post("/triggers/approval")
3246 .header("content-type", "application/json")
3247 .body(Body::from(
3248 serde_json::to_vec(&ApprovalIngressRequest {
3249 session_id: "approval-session".to_string(),
3250 resume_token,
3251 decision: ApprovalDecision::Approved,
3252 metadata: json!({"approved_by": "tester"}),
3253 granted_scopes: BTreeSet::from(["tool:run".to_string()]),
3254 provider: None,
3255 policy_override: None,
3256 })
3257 .expect("request json"),
3258 ))
3259 .expect("request"),
3260 )
3261 .await
3262 .expect("response");
3263
3264 let resume_bytes = axum::body::to_bytes(resume.into_body(), usize::MAX)
3265 .await
3266 .expect("body");
3267 let resumed: RuntimeRunResult = serde_json::from_slice(&resume_bytes).expect("outcome");
3268 assert_eq!(resumed.outcome.stop_reason, StopReason::Responded);
3269 assert_eq!(resumed.outcome.response.as_deref(), Some("completed"));
3270 }
3271
3272 #[tokio::test]
3273 async fn invalid_runtime_config_fails_fast() {
3274 let result = build_runtime_state(RuntimeBootstrapConfig {
3275 server: server_config(),
3276 store: StoreBootstrapConfig::Sqlite {
3277 database_url: "".to_string(),
3278 },
3279 cache: None,
3280 blob: BlobBootstrapConfig::InMemory,
3281 provider: ProviderBootstrapConfig::Mock {
3282 response: "processed".to_string(),
3283 },
3284 enable_research_planner: false,
3285 })
3286 .await;
3287
3288 match result {
3289 Ok(_) => panic!("expected config error"),
3290 Err(error) => assert!(error.to_string().contains("must not be empty")),
3291 }
3292 }
3293
3294 #[test]
3295 fn local_directory_blob_store_rejects_invalid_uri() {
3296 let error = LocalFileBlobStore::path_from_uri("memory://abc").expect_err("error");
3297 assert_eq!(error.message, "unsupported local blob uri");
3298 }
3299}