1use std::sync::{Arc, OnceLock};
7
8use crate::runtime::RuntimeSessionState;
9use crate::{SessionPolicy, SessionSnapshot};
10
11use super::PluginError;
12
13#[derive(Clone, Debug)]
14pub struct SessionReadView(Arc<SessionReadState>);
15
16#[derive(Debug)]
17struct SessionReadState {
18 meta: SessionReadMeta,
19 graph: SessionReadGraph,
20 read_model: crate::session_graph::SessionReadModel,
21 chronological_projection: OnceLock<Arc<crate::ChronologicalProjection>>,
22}
23
24#[derive(Clone, Debug)]
25struct SessionReadMeta {
26 session_id: String,
27 policy: SessionPolicy,
28 turn_index: usize,
29 token_usage: crate::TokenUsage,
30 last_prompt_usage: Option<crate::runtime::PromptUsage>,
31 protocol_turn_options: crate::ProtocolTurnOptions,
32}
33
34impl SessionReadMeta {
35 fn from_snapshot_ref(snapshot: &SessionSnapshot) -> Self {
36 Self {
37 session_id: snapshot.session_id.clone(),
38 policy: snapshot.policy.clone(),
39 turn_index: snapshot.turn_index,
40 token_usage: snapshot.token_usage.clone(),
41 last_prompt_usage: snapshot.last_prompt_usage.clone(),
42 protocol_turn_options: snapshot.protocol_turn_options.clone(),
43 }
44 }
45
46 fn from_persisted_ref(state: &RuntimeSessionState) -> Self {
47 Self {
48 session_id: state.session_id.clone(),
49 policy: state.policy.clone(),
50 turn_index: state.turn_index,
51 token_usage: state.token_usage.clone(),
52 last_prompt_usage: state.last_prompt_usage.clone(),
53 protocol_turn_options: state.protocol_turn_options.clone(),
54 }
55 }
56
57 fn with_policy(mut self, policy: SessionPolicy) -> Self {
58 self.policy = policy;
59 self
60 }
61
62 fn with_turn_index(mut self, turn_index: usize) -> Self {
63 self.turn_index = turn_index;
64 self
65 }
66
67 fn with_protocol_turn_options(
68 mut self,
69 protocol_turn_options: crate::ProtocolTurnOptions,
70 ) -> Self {
71 self.protocol_turn_options = protocol_turn_options;
72 self
73 }
74
75 fn to_snapshot(&self, session_graph: crate::SessionGraph) -> SessionSnapshot {
76 SessionSnapshot {
77 session_id: self.session_id.clone(),
78 policy: self.policy.clone(),
79 agent_frames: Vec::new(),
80 current_agent_frame_id: String::new(),
81 session_graph,
82 turn_index: self.turn_index,
83 token_usage: self.token_usage.clone(),
84 last_prompt_usage: self.last_prompt_usage.clone(),
85 protocol_turn_options: self.protocol_turn_options.clone(),
86 tool_state_ref: None,
87 tool_state_generation: None,
88 plugin_snapshot_ref: None,
89 plugin_snapshot_revision: None,
90 execution_state_ref: None,
91 token_ledger: Vec::new(),
92 checkpoint_ref: None,
93 }
94 }
95}
96
97#[derive(Debug)]
98enum SessionReadGraph {
99 Owned(crate::SessionGraph),
100 Derived {
101 cache: OnceLock<crate::SessionGraph>,
102 base_graph: Arc<crate::SessionGraph>,
103 },
104}
105
106impl SessionReadView {
107 fn from_graph_message_sequence_meta(
108 meta: SessionReadMeta,
109 base_graph: Arc<crate::SessionGraph>,
110 messages: crate::MessageSequence,
111 active_events: Arc<Vec<crate::SessionEventRecord>>,
112 ) -> Self {
113 Self(Arc::new(SessionReadState {
114 meta,
115 graph: SessionReadGraph::Derived {
116 cache: OnceLock::new(),
117 base_graph,
118 },
119 read_model: crate::session_graph::SessionReadModel {
120 active_events,
121 messages: messages.shared(),
122 prompt_render_cache: Arc::new(crate::BaseRenderCache::new()),
123 },
124 chronological_projection: OnceLock::new(),
125 }))
126 }
127
128 pub fn from_snapshot(snapshot: &SessionSnapshot) -> Self {
129 let read_model = snapshot.read_model();
130 Self(Arc::new(SessionReadState {
131 meta: SessionReadMeta::from_snapshot_ref(snapshot),
132 graph: SessionReadGraph::Owned(snapshot.session_graph.clone()),
133 read_model,
134 chronological_projection: OnceLock::new(),
135 }))
136 }
137
138 pub fn from_persisted_state(state: &RuntimeSessionState) -> Self {
139 let graph = state.session_graph.clone();
140 let read_model = state.read_model();
141 Self(Arc::new(SessionReadState {
142 meta: SessionReadMeta::from_persisted_ref(state),
143 graph: SessionReadGraph::Owned(graph),
144 read_model,
145 chronological_projection: OnceLock::new(),
146 }))
147 }
148
149 pub(crate) fn from_runtime_state(
150 state: &RuntimeSessionState,
151 policy: SessionPolicy,
152 protocol_turn_options: crate::ProtocolTurnOptions,
153 ) -> Self {
154 let graph = state.session_graph.clone();
155 let read_model = state.read_model();
156 Self(Arc::new(SessionReadState {
157 meta: SessionReadMeta::from_persisted_ref(state)
158 .with_policy(policy)
159 .with_protocol_turn_options(protocol_turn_options),
160 graph: SessionReadGraph::Owned(graph),
161 read_model,
162 chronological_projection: OnceLock::new(),
163 }))
164 }
165
166 pub(crate) fn derived_from_persisted_state(
167 state: &RuntimeSessionState,
168 policy: SessionPolicy,
169 turn_index: usize,
170 protocol_turn_options: crate::ProtocolTurnOptions,
171 base_graph: Arc<crate::SessionGraph>,
172 messages: crate::MessageSequence,
173 ) -> Self {
174 let active_events = base_graph
175 .read_model_for_agent_frame(
176 &state.current_agent_frame_id,
177 state
178 .current_agent_frame()
179 .map(|frame| frame.previous_frame_id.is_none())
180 .unwrap_or(true),
181 )
182 .active_events;
183 Self::from_graph_message_sequence_meta(
184 SessionReadMeta::from_persisted_ref(state)
185 .with_policy(policy)
186 .with_turn_index(turn_index)
187 .with_protocol_turn_options(protocol_turn_options),
188 base_graph,
189 messages,
190 active_events,
191 )
192 }
193
194 pub fn session_graph(&self) -> &crate::SessionGraph {
195 match &self.0.graph {
196 SessionReadGraph::Owned(graph) => graph,
197 SessionReadGraph::Derived { cache, base_graph } => cache.get_or_init(|| {
198 let mut graph = (**base_graph).clone();
199 graph.replace_active_read_state(self.0.read_model.messages.as_slice());
200 graph
201 }),
202 }
203 }
204
205 pub fn session_id(&self) -> &str {
206 &self.0.meta.session_id
207 }
208
209 pub fn policy(&self) -> &SessionPolicy {
210 &self.0.meta.policy
211 }
212
213 pub fn materialized_session_graph(&self) -> crate::SessionGraph {
214 self.session_graph().clone()
215 }
216
217 pub fn messages(&self) -> &[crate::Message] {
218 self.0.read_model.messages.as_slice()
219 }
220
221 pub fn active_events(&self) -> &[crate::SessionEventRecord] {
222 self.0.read_model.active_events.as_slice()
223 }
224
225 pub fn chronological_projection(&self) -> crate::ChronologicalProjection {
226 crate::ChronologicalProjection::from_read_model(&self.0.read_model)
227 }
228
229 pub(crate) fn shared_chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
230 Arc::clone(self.0.chronological_projection.get_or_init(|| {
231 Arc::new(crate::ChronologicalProjection::from_read_model(
232 &self.0.read_model,
233 ))
234 }))
235 }
236
237 pub fn message_tree(&self) -> Vec<crate::SessionMessageTreeNode> {
238 self.session_graph().message_tree()
239 }
240
241 pub fn turn_index(&self) -> usize {
242 self.0.meta.turn_index
243 }
244
245 pub fn token_usage(&self) -> &crate::TokenUsage {
246 &self.0.meta.token_usage
247 }
248
249 pub fn last_prompt_usage(&self) -> Option<&crate::runtime::PromptUsage> {
250 self.0.meta.last_prompt_usage.as_ref()
251 }
252
253 pub fn protocol_turn_options(&self) -> &crate::ProtocolTurnOptions {
254 &self.0.meta.protocol_turn_options
255 }
256
257 pub fn to_snapshot(&self) -> SessionSnapshot {
258 self.0.meta.to_snapshot(self.session_graph().clone())
259 }
260}
261
262#[derive(Clone)]
264pub struct TurnTransformContext<'run> {
265 pub session_id: String,
266 pub state: SessionReadView,
267 pub prompt_usage: Option<crate::runtime::PromptUsage>,
268 pub max_context_tokens: Option<usize>,
269 pub sessions: Arc<dyn super::SessionStateService>,
270 pub session_lifecycle: Arc<dyn super::SessionLifecycleService>,
271 pub session_graph: Arc<dyn super::SessionGraphService>,
272 pub scoped_effect_controller: crate::ScopedEffectController<'run>,
273 pub direct_completions: crate::DirectCompletionClient<'run>,
274}
275
276#[derive(Clone)]
278pub struct CompactionContext<'run> {
279 pub session_id: String,
280 pub instructions: Option<String>,
281 pub state: SessionReadView,
282 pub sessions: Arc<dyn super::SessionStateService>,
283 pub session_lifecycle: Arc<dyn super::SessionLifecycleService>,
284 pub session_graph: Arc<dyn super::SessionGraphService>,
285 pub scoped_effect_controller: crate::ScopedEffectController<'run>,
286}
287
288#[derive(Debug, thiserror::Error, Clone)]
289pub enum ContextError {
290 #[error("context pipeline error: {0}")]
291 Pipeline(String),
292 #[error("context session error: {0}")]
293 Session(String),
294}
295
296impl From<PluginError> for ContextError {
297 fn from(value: PluginError) -> Self {
298 Self::Session(value.to_string())
299 }
300}
301
302#[derive(Clone, Debug, Default)]
303pub struct ContextCompaction {
304 pub initial_nodes: Vec<crate::SessionAppendNode>,
305}
306
307impl ContextCompaction {
308 pub fn new(initial_nodes: Vec<crate::SessionAppendNode>) -> Self {
309 Self { initial_nodes }
310 }
311
312 pub fn is_empty(&self) -> bool {
313 self.initial_nodes.is_empty()
314 }
315}
316
317#[async_trait::async_trait]
319pub trait TurnContextTransform: Send + Sync {
320 fn id(&self) -> &'static str;
321 async fn transform(
322 &self,
323 ctx: &TurnTransformContext<'_>,
324 input: crate::session_model::context::PreparedContext,
325 ) -> Result<crate::session_model::context::PreparedContext, ContextError>;
326}
327
328#[async_trait::async_trait]
330pub trait ContextCompactor: Send + Sync {
331 fn id(&self) -> &'static str;
332 async fn compact(
333 &self,
334 ctx: &CompactionContext<'_>,
335 ) -> Result<Option<ContextCompaction>, ContextError>;
336}