adk-runner 0.6.0

Agent execution runtime for Rust Agent Development Kit (ADK-Rust) agents
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
use adk_core::{
    AdkIdentity, Agent, AppName, Artifacts, CallbackContext, Content, Event, ExecutionIdentity,
    InvocationContext as InvocationContextTrait, InvocationId, Memory, ReadonlyContext,
    RequestContext, RunConfig, SessionId, UserId,
};
use adk_session::Session as AdkSession;
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::{Arc, RwLock, atomic::AtomicBool};

/// MutableSession wraps a session with shared mutable state.
///
/// This mirrors ADK-Go's MutableSession pattern where state changes from
/// events are immediately visible to all agents sharing the same context.
/// This is critical for SequentialAgent/LoopAgent patterns where downstream
/// agents need to read state set by upstream agents via output_key.
pub struct MutableSession {
    /// The original session snapshot (for metadata like id, app_name, user_id)
    inner: Arc<dyn AdkSession>,
    /// Shared mutable state - updated when events are processed
    /// This is the key difference from the old SessionAdapter which used immutable snapshots
    state: Arc<RwLock<HashMap<String, serde_json::Value>>>,
    /// Accumulated events during this invocation (uses adk_core::Event which is re-exported by adk_session)
    events: Arc<RwLock<Vec<Event>>>,
}

impl MutableSession {
    /// Create a new MutableSession from a session snapshot.
    /// The state is copied from the session and becomes mutable.
    pub fn new(session: Arc<dyn AdkSession>) -> Self {
        // Clone the initial state from the session
        let initial_state = session.state().all();
        // Clone the initial events
        let initial_events = session.events().all();

        Self {
            inner: session,
            state: Arc::new(RwLock::new(initial_state)),
            events: Arc::new(RwLock::new(initial_events)),
        }
    }

    /// Apply state delta from an event to the mutable state.
    /// This is called by the Runner when events are yielded.
    pub fn apply_state_delta(&self, delta: &HashMap<String, serde_json::Value>) {
        if delta.is_empty() {
            return;
        }

        let Ok(mut state) = self.state.write() else {
            tracing::error!("state RwLock poisoned in apply_state_delta — skipping delta");
            return;
        };
        for (key, value) in delta {
            // Skip temp: prefixed keys (they shouldn't persist)
            if !key.starts_with("temp:") {
                state.insert(key.clone(), value.clone());
            }
        }
    }

    /// Append an event to the session's event list.
    /// This keeps the in-memory view consistent.
    pub fn append_event(&self, event: Event) {
        let Ok(mut events) = self.events.write() else {
            tracing::error!("events RwLock poisoned in append_event — event dropped");
            return;
        };
        events.push(event);
    }

    /// Get a snapshot of all events in the session.
    /// Used by the runner for compaction decisions.
    pub fn events_snapshot(&self) -> Vec<Event> {
        let Ok(events) = self.events.read() else {
            tracing::error!("events RwLock poisoned in events_snapshot — returning empty");
            return Vec::new();
        };
        events.clone()
    }

    /// Return the number of accumulated events without cloning the full list.
    pub fn events_len(&self) -> usize {
        let Ok(events) = self.events.read() else {
            tracing::error!("events RwLock poisoned in events_len — returning 0");
            return 0;
        };
        events.len()
    }

    /// Build conversation history, optionally filtered for a specific agent.
    ///
    /// When `agent_name` is `Some`, events authored by other agents (not "user",
    /// not the named agent, and not function/tool responses) are excluded. This
    /// prevents a transferred sub-agent from seeing the parent's tool calls
    /// mapped as "model" role, which would cause the LLM to think work is
    /// already done.
    ///
    /// When `agent_name` is `None`, all events are included (backward-compatible).
    pub fn conversation_history_for_agent_impl(
        &self,
        agent_name: Option<&str>,
    ) -> Vec<adk_core::Content> {
        let Ok(events) = self.events.read() else {
            tracing::error!("events RwLock poisoned in conversation_history — returning empty");
            return Vec::new();
        };
        let mut history = Vec::new();

        // Find the most recent compaction event — everything before its
        // end_timestamp has been summarized and should be replaced by the
        // compacted content.
        let mut compaction_boundary = None;
        for event in events.iter().rev() {
            if let Some(ref compaction) = event.actions.compaction {
                history.push(compaction.compacted_content.clone());
                compaction_boundary = Some(compaction.end_timestamp);
                break;
            }
        }

        for event in events.iter() {
            // Skip the compaction event itself
            if event.actions.compaction.is_some() {
                continue;
            }

            // Skip events that were already compacted
            if let Some(boundary) = compaction_boundary {
                if event.timestamp <= boundary {
                    continue;
                }
            }

            // When filtering for a specific agent, skip events from other agents.
            // Keep: user messages and the agent's own events.
            // Skip: other agents' events entirely (model-role, function calls,
            // and function/tool responses). This prevents the sub-agent from
            // seeing orphaned function responses without their preceding calls.
            if let Some(name) = agent_name {
                if event.author != "user" && event.author != name {
                    continue;
                }
            }

            if let Some(content) = &event.llm_response.content {
                let mut mapped_content = content.clone();
                mapped_content.role = match (event.author.as_str(), content.role.as_str()) {
                    ("user", _) => "user",
                    (_, "function" | "tool") => content.role.as_str(),
                    _ => "model",
                }
                .to_string();
                history.push(mapped_content);
            }
        }

        history
    }
}

impl adk_core::Session for MutableSession {
    fn id(&self) -> &str {
        self.inner.id()
    }

    fn app_name(&self) -> &str {
        self.inner.app_name()
    }

    fn user_id(&self) -> &str {
        self.inner.user_id()
    }

    fn state(&self) -> &dyn adk_core::State {
        self
    }

    fn conversation_history(&self) -> Vec<adk_core::Content> {
        self.conversation_history_for_agent_impl(None)
    }

    fn conversation_history_for_agent(&self, agent_name: &str) -> Vec<adk_core::Content> {
        self.conversation_history_for_agent_impl(Some(agent_name))
    }
}

impl adk_core::State for MutableSession {
    fn get(&self, key: &str) -> Option<serde_json::Value> {
        let Ok(state) = self.state.read() else {
            tracing::error!("state RwLock poisoned in State::get — returning None");
            return None;
        };
        state.get(key).cloned()
    }

    fn set(&mut self, key: String, value: serde_json::Value) {
        if let Err(msg) = adk_core::validate_state_key(&key) {
            tracing::warn!(key = %key, "rejecting invalid state key: {msg}");
            return;
        }
        let Ok(mut state) = self.state.write() else {
            tracing::error!("state RwLock poisoned in State::set — value dropped");
            return;
        };
        state.insert(key, value);
    }

    fn all(&self) -> HashMap<String, serde_json::Value> {
        let Ok(state) = self.state.read() else {
            tracing::error!("state RwLock poisoned in State::all — returning empty");
            return HashMap::new();
        };
        state.clone()
    }
}

pub struct InvocationContext {
    identity: ExecutionIdentity,
    agent: Arc<dyn Agent>,
    user_content: Content,
    artifacts: Option<Arc<dyn Artifacts>>,
    memory: Option<Arc<dyn Memory>>,
    run_config: RunConfig,
    ended: Arc<AtomicBool>,
    /// Mutable session that allows state to be updated during execution.
    /// This is shared across all agents in a workflow, enabling state
    /// propagation between sequential/parallel agents.
    session: Arc<MutableSession>,
    /// Optional request context from the server's auth middleware bridge.
    /// When present, `user_id()` returns `request_context.user_id` and
    /// `user_scopes()` returns `request_context.scopes`.
    request_context: Option<RequestContext>,
    /// Optional shared state for parallel agent coordination.
    shared_state: Option<Arc<adk_core::SharedState>>,
}

impl InvocationContext {
    /// Create a new invocation context from validated typed identifiers.
    pub fn new_typed(
        invocation_id: String,
        agent: Arc<dyn Agent>,
        user_id: UserId,
        app_name: AppName,
        session_id: SessionId,
        user_content: Content,
        session: Arc<dyn AdkSession>,
    ) -> adk_core::Result<Self> {
        let identity = ExecutionIdentity {
            adk: AdkIdentity { app_name, user_id, session_id },
            invocation_id: InvocationId::try_from(invocation_id)?,
            branch: String::new(),
            agent_name: agent.name().to_string(),
        };
        Ok(Self {
            identity,
            agent,
            user_content,
            artifacts: None,
            memory: None,
            run_config: RunConfig::default(),
            ended: Arc::new(AtomicBool::new(false)),
            session: Arc::new(MutableSession::new(session)),
            request_context: None,
            shared_state: None,
        })
    }

    pub fn new(
        invocation_id: String,
        agent: Arc<dyn Agent>,
        user_id: String,
        app_name: String,
        session_id: String,
        user_content: Content,
        session: Arc<dyn AdkSession>,
    ) -> adk_core::Result<Self> {
        Self::new_typed(
            invocation_id,
            agent,
            UserId::try_from(user_id)?,
            AppName::try_from(app_name)?,
            SessionId::try_from(session_id)?,
            user_content,
            session,
        )
    }

    /// Create an invocation context that reuses an existing mutable session and
    /// validated typed identifiers.
    pub fn with_mutable_session_typed(
        invocation_id: String,
        agent: Arc<dyn Agent>,
        user_id: UserId,
        app_name: AppName,
        session_id: SessionId,
        user_content: Content,
        session: Arc<MutableSession>,
    ) -> adk_core::Result<Self> {
        let identity = ExecutionIdentity {
            adk: AdkIdentity { app_name, user_id, session_id },
            invocation_id: InvocationId::try_from(invocation_id)?,
            branch: String::new(),
            agent_name: agent.name().to_string(),
        };
        Ok(Self {
            identity,
            agent,
            user_content,
            artifacts: None,
            memory: None,
            run_config: RunConfig::default(),
            ended: Arc::new(AtomicBool::new(false)),
            session,
            request_context: None,
            shared_state: None,
        })
    }

    /// Create an InvocationContext with an existing MutableSession.
    /// This allows sharing the same mutable session across multiple contexts
    /// (e.g., for agent transfers).
    pub fn with_mutable_session(
        invocation_id: String,
        agent: Arc<dyn Agent>,
        user_id: String,
        app_name: String,
        session_id: String,
        user_content: Content,
        session: Arc<MutableSession>,
    ) -> adk_core::Result<Self> {
        Self::with_mutable_session_typed(
            invocation_id,
            agent,
            UserId::try_from(user_id)?,
            AppName::try_from(app_name)?,
            SessionId::try_from(session_id)?,
            user_content,
            session,
        )
    }

    pub fn with_branch(mut self, branch: String) -> Self {
        self.identity.branch = branch;
        self
    }

    pub fn with_artifacts(mut self, artifacts: Arc<dyn Artifacts>) -> Self {
        self.artifacts = Some(artifacts);
        self
    }

    pub fn with_memory(mut self, memory: Arc<dyn Memory>) -> Self {
        self.memory = Some(memory);
        self
    }

    pub fn with_run_config(mut self, config: RunConfig) -> Self {
        self.run_config = config;
        self
    }

    /// Set the request context from the server's auth middleware bridge.
    ///
    /// When set, `user_id()` returns `request_context.user_id` (overriding
    /// the session-scoped identity), and `user_scopes()` returns
    /// `request_context.scopes`. This is the explicit authenticated user
    /// override — `RequestContext` remains separate from `ExecutionIdentity`
    /// and `AdkIdentity` (it does not carry session or invocation IDs).
    pub fn with_request_context(mut self, ctx: RequestContext) -> Self {
        self.request_context = Some(ctx);
        self
    }

    /// Set the shared state for parallel agent coordination.
    pub fn with_shared_state(mut self, shared: Arc<adk_core::SharedState>) -> Self {
        self.shared_state = Some(shared);
        self
    }

    /// Get a reference to the mutable session.
    /// This allows the Runner to apply state deltas when events are processed.
    pub fn mutable_session(&self) -> &Arc<MutableSession> {
        &self.session
    }
}

#[async_trait]
impl ReadonlyContext for InvocationContext {
    fn invocation_id(&self) -> &str {
        self.identity.invocation_id.as_ref()
    }

    fn agent_name(&self) -> &str {
        self.agent.name()
    }

    fn user_id(&self) -> &str {
        // Explicit authenticated user override: when a RequestContext is
        // present (set via with_request_context from the auth middleware
        // bridge), the authenticated user_id takes precedence over the
        // session-scoped identity. This keeps auth binding explicit and
        // ensures the runtime reflects the verified caller identity.
        self.request_context.as_ref().map_or(self.identity.adk.user_id.as_ref(), |rc| &rc.user_id)
    }

    fn app_name(&self) -> &str {
        self.identity.adk.app_name.as_ref()
    }

    fn session_id(&self) -> &str {
        self.identity.adk.session_id.as_ref()
    }

    fn branch(&self) -> &str {
        &self.identity.branch
    }

    fn user_content(&self) -> &Content {
        &self.user_content
    }
}

#[async_trait]
impl CallbackContext for InvocationContext {
    fn artifacts(&self) -> Option<Arc<dyn Artifacts>> {
        self.artifacts.clone()
    }

    fn shared_state(&self) -> Option<Arc<adk_core::SharedState>> {
        self.shared_state.clone()
    }
}

#[async_trait]
impl InvocationContextTrait for InvocationContext {
    fn agent(&self) -> Arc<dyn Agent> {
        self.agent.clone()
    }

    fn memory(&self) -> Option<Arc<dyn Memory>> {
        self.memory.clone()
    }

    fn session(&self) -> &dyn adk_core::Session {
        self.session.as_ref()
    }

    fn run_config(&self) -> &RunConfig {
        &self.run_config
    }

    fn end_invocation(&self) {
        self.ended.store(true, std::sync::atomic::Ordering::SeqCst);
    }

    fn ended(&self) -> bool {
        self.ended.load(std::sync::atomic::Ordering::SeqCst)
    }

    fn user_scopes(&self) -> Vec<String> {
        self.request_context.as_ref().map_or_else(Vec::new, |rc| rc.scopes.clone())
    }

    fn request_metadata(&self) -> HashMap<String, serde_json::Value> {
        self.request_context.as_ref().map_or_else(HashMap::new, |rc| {
            rc.metadata
                .iter()
                .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone())))
                .collect()
        })
    }
}