1use std::sync::{Arc, OnceLock};
7
8use crate::runtime::RuntimeSessionState;
9use crate::{SessionPolicy, SessionSnapshot};
10
11use super::PluginError;
12
13#[derive(Clone, Debug)]
15pub enum RewriteTrigger {
16 Manual { instructions: Option<String> },
18 WindowShrink {
20 old_max: Option<usize>,
21 new_max: Option<usize>,
22 },
23 Periodic,
26}
27
28#[derive(Clone, Debug, Default)]
30pub struct HistoryRewriteMetadata {
31 pub summarized_token_count: Option<u64>,
32 pub pruned_message_count: u32,
33 pub produced_summary: bool,
34}
35
36#[derive(Clone, Debug)]
38pub struct HistoryState {
39 pub messages: Vec<crate::Message>,
40 pub tool_calls: Vec<crate::ToolCallRecord>,
41 pub metadata: HistoryRewriteMetadata,
42}
43
44impl HistoryState {
45 pub fn from_snapshot(snapshot: &SessionSnapshot) -> Self {
46 let read_view = snapshot.read_view();
47 Self {
48 messages: read_view.messages().to_vec(),
49 tool_calls: read_view.tool_calls().to_vec(),
50 metadata: HistoryRewriteMetadata::default(),
51 }
52 }
53}
54
55#[derive(Clone, Debug)]
56pub struct SessionReadView(Arc<SessionReadState>);
57
58#[derive(Debug)]
59struct SessionReadState {
60 meta: SessionReadMeta,
61 graph: SessionReadGraph,
62 read_model: crate::session_graph::SessionReadModel,
63 chronological_projection: OnceLock<Arc<crate::ChronologicalProjection>>,
64}
65
66#[derive(Clone, Debug)]
67struct SessionReadMeta {
68 session_id: String,
69 policy: SessionPolicy,
70 turn_index: usize,
71 token_usage: crate::TokenUsage,
72 last_prompt_usage: Option<crate::runtime::PromptUsage>,
73 protocol_turn_options: crate::ProtocolTurnOptions,
74}
75
76impl SessionReadMeta {
77 fn from_snapshot_ref(snapshot: &SessionSnapshot) -> Self {
78 Self {
79 session_id: snapshot.session_id.clone(),
80 policy: snapshot.policy.clone(),
81 turn_index: snapshot.turn_index,
82 token_usage: snapshot.token_usage.clone(),
83 last_prompt_usage: snapshot.last_prompt_usage.clone(),
84 protocol_turn_options: snapshot.protocol_turn_options.clone(),
85 }
86 }
87
88 fn from_persisted_ref(state: &RuntimeSessionState) -> Self {
89 Self {
90 session_id: state.session_id.clone(),
91 policy: state.policy.clone(),
92 turn_index: state.turn_index,
93 token_usage: state.token_usage.clone(),
94 last_prompt_usage: state.last_prompt_usage.clone(),
95 protocol_turn_options: state.protocol_turn_options.clone(),
96 }
97 }
98
99 fn with_policy(mut self, policy: SessionPolicy) -> Self {
100 self.policy = policy;
101 self
102 }
103
104 fn with_turn_index(mut self, turn_index: usize) -> Self {
105 self.turn_index = turn_index;
106 self
107 }
108
109 fn with_protocol_turn_options(
110 mut self,
111 protocol_turn_options: crate::ProtocolTurnOptions,
112 ) -> Self {
113 self.protocol_turn_options = protocol_turn_options;
114 self
115 }
116
117 fn to_snapshot(&self, session_graph: crate::SessionGraph) -> SessionSnapshot {
118 SessionSnapshot {
119 session_id: self.session_id.clone(),
120 policy: self.policy.clone(),
121 agent_frames: Vec::new(),
122 current_agent_frame_id: String::new(),
123 session_graph,
124 turn_index: self.turn_index,
125 token_usage: self.token_usage.clone(),
126 last_prompt_usage: self.last_prompt_usage.clone(),
127 protocol_turn_options: self.protocol_turn_options.clone(),
128 tool_state_ref: None,
129 tool_state_generation: None,
130 plugin_snapshot_ref: None,
131 plugin_snapshot_revision: None,
132 execution_state_ref: None,
133 token_ledger: Vec::new(),
134 checkpoint_ref: None,
135 }
136 }
137}
138
139#[derive(Debug)]
140enum SessionReadGraph {
141 Owned(crate::SessionGraph),
142 Derived {
143 cache: OnceLock<crate::SessionGraph>,
144 base_graph: Arc<crate::SessionGraph>,
145 },
146}
147
148impl SessionReadView {
149 fn from_graph_message_sequence_meta(
150 meta: SessionReadMeta,
151 base_graph: Arc<crate::SessionGraph>,
152 messages: crate::MessageSequence,
153 tool_calls: Arc<Vec<crate::ToolCallRecord>>,
154 active_events: Arc<Vec<crate::SessionEventRecord>>,
155 ) -> Self {
156 Self(Arc::new(SessionReadState {
157 meta,
158 graph: SessionReadGraph::Derived {
159 cache: OnceLock::new(),
160 base_graph,
161 },
162 read_model: crate::session_graph::SessionReadModel {
163 active_events,
164 messages: messages.shared(),
165 tool_calls,
166 prompt_render_cache: Arc::new(crate::BaseRenderCache::new()),
167 },
168 chronological_projection: OnceLock::new(),
169 }))
170 }
171
172 pub fn from_snapshot(snapshot: &SessionSnapshot) -> Self {
173 let read_model = snapshot.read_model();
174 Self(Arc::new(SessionReadState {
175 meta: SessionReadMeta::from_snapshot_ref(snapshot),
176 graph: SessionReadGraph::Owned(snapshot.session_graph.clone()),
177 read_model,
178 chronological_projection: OnceLock::new(),
179 }))
180 }
181
182 pub fn from_persisted_state(state: &RuntimeSessionState) -> Self {
183 let graph = state.session_graph.clone();
184 let read_model = state.read_model();
185 Self(Arc::new(SessionReadState {
186 meta: SessionReadMeta::from_persisted_ref(state),
187 graph: SessionReadGraph::Owned(graph),
188 read_model,
189 chronological_projection: OnceLock::new(),
190 }))
191 }
192
193 pub(crate) fn from_runtime_state(
194 state: &RuntimeSessionState,
195 policy: SessionPolicy,
196 protocol_turn_options: crate::ProtocolTurnOptions,
197 ) -> Self {
198 let graph = state.session_graph.clone();
199 let read_model = state.read_model();
200 Self(Arc::new(SessionReadState {
201 meta: SessionReadMeta::from_persisted_ref(state)
202 .with_policy(policy)
203 .with_protocol_turn_options(protocol_turn_options),
204 graph: SessionReadGraph::Owned(graph),
205 read_model,
206 chronological_projection: OnceLock::new(),
207 }))
208 }
209
210 pub(crate) fn derived_from_persisted_state(
211 state: &RuntimeSessionState,
212 policy: SessionPolicy,
213 turn_index: usize,
214 protocol_turn_options: crate::ProtocolTurnOptions,
215 base_graph: Arc<crate::SessionGraph>,
216 messages: crate::MessageSequence,
217 tool_calls: Arc<Vec<crate::ToolCallRecord>>,
218 ) -> Self {
219 let active_events = base_graph
220 .read_model_for_agent_frame(
221 &state.current_agent_frame_id,
222 state
223 .current_agent_frame()
224 .map(|frame| frame.previous_frame_id.is_none())
225 .unwrap_or(true),
226 )
227 .active_events;
228 Self::from_graph_message_sequence_meta(
229 SessionReadMeta::from_persisted_ref(state)
230 .with_policy(policy)
231 .with_turn_index(turn_index)
232 .with_protocol_turn_options(protocol_turn_options),
233 base_graph,
234 messages,
235 tool_calls,
236 active_events,
237 )
238 }
239
240 pub fn session_graph(&self) -> &crate::SessionGraph {
241 match &self.0.graph {
242 SessionReadGraph::Owned(graph) => graph,
243 SessionReadGraph::Derived { cache, base_graph } => cache.get_or_init(|| {
244 let mut graph = (**base_graph).clone();
245 graph.replace_active_read_state(
246 self.0.read_model.messages.as_slice(),
247 self.0.read_model.tool_calls.as_slice(),
248 );
249 graph
250 }),
251 }
252 }
253
254 pub fn session_id(&self) -> &str {
255 &self.0.meta.session_id
256 }
257
258 pub fn policy(&self) -> &SessionPolicy {
259 &self.0.meta.policy
260 }
261
262 pub fn materialized_session_graph(&self) -> crate::SessionGraph {
263 self.session_graph().clone()
264 }
265
266 pub fn messages(&self) -> &[crate::Message] {
267 self.0.read_model.messages.as_slice()
268 }
269
270 pub fn tool_calls(&self) -> &[crate::ToolCallRecord] {
271 self.0.read_model.tool_calls.as_slice()
272 }
273
274 pub fn active_events(&self) -> &[crate::SessionEventRecord] {
275 self.0.read_model.active_events.as_slice()
276 }
277
278 pub fn chronological_projection(&self) -> crate::ChronologicalProjection {
279 crate::ChronologicalProjection::from_read_model(&self.0.read_model)
280 }
281
282 pub(crate) fn shared_chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
283 Arc::clone(self.0.chronological_projection.get_or_init(|| {
284 Arc::new(crate::ChronologicalProjection::from_read_model(
285 &self.0.read_model,
286 ))
287 }))
288 }
289
290 pub fn message_tree(&self) -> Vec<crate::SessionMessageTreeNode> {
291 self.session_graph().message_tree()
292 }
293
294 pub fn turn_index(&self) -> usize {
295 self.0.meta.turn_index
296 }
297
298 pub fn token_usage(&self) -> &crate::TokenUsage {
299 &self.0.meta.token_usage
300 }
301
302 pub fn last_prompt_usage(&self) -> Option<&crate::runtime::PromptUsage> {
303 self.0.meta.last_prompt_usage.as_ref()
304 }
305
306 pub fn protocol_turn_options(&self) -> &crate::ProtocolTurnOptions {
307 &self.0.meta.protocol_turn_options
308 }
309
310 pub fn to_snapshot(&self) -> SessionSnapshot {
311 self.0.meta.to_snapshot(self.session_graph().clone())
312 }
313}
314
315#[derive(Clone)]
317pub struct TurnTransformContext<'run> {
318 pub session_id: String,
319 pub state: SessionReadView,
320 pub prompt_usage: Option<crate::runtime::PromptUsage>,
321 pub max_context_tokens: Option<usize>,
322 pub sessions: Arc<dyn super::SessionStateService>,
323 pub session_lifecycle: Arc<dyn super::SessionLifecycleService>,
324 pub session_graph: Arc<dyn super::SessionGraphService>,
325 pub scoped_effect_controller: crate::ScopedEffectController<'run>,
326 pub direct_completions: crate::DirectCompletionClient<'run>,
327}
328
329#[derive(Clone)]
331pub struct RewriteContext<'run> {
332 pub session_id: String,
333 pub trigger: RewriteTrigger,
334 pub state: SessionReadView,
335 pub sessions: Arc<dyn super::SessionStateService>,
336 pub session_lifecycle: Arc<dyn super::SessionLifecycleService>,
337 pub session_graph: Arc<dyn super::SessionGraphService>,
338 pub scoped_effect_controller: crate::ScopedEffectController<'run>,
339}
340
341#[derive(Debug, thiserror::Error, Clone)]
342pub enum HistoryError {
343 #[error("history pipeline error: {0}")]
344 Pipeline(String),
345 #[error("history session error: {0}")]
346 Session(String),
347}
348
349impl From<PluginError> for HistoryError {
350 fn from(value: PluginError) -> Self {
351 Self::Session(value.to_string())
352 }
353}
354
355#[async_trait::async_trait]
357pub trait TurnContextTransform: Send + Sync {
358 fn id(&self) -> &'static str;
359 async fn transform(
360 &self,
361 ctx: &TurnTransformContext<'_>,
362 input: crate::session_model::context::PreparedContext,
363 ) -> Result<crate::session_model::context::PreparedContext, HistoryError>;
364}
365
366#[async_trait::async_trait]
369pub trait HistoryRewriter: Send + Sync {
370 fn id(&self) -> &'static str;
371 fn accepts(&self, _trigger: &RewriteTrigger) -> bool {
372 true
373 }
374 async fn rewrite(
375 &self,
376 ctx: &RewriteContext<'_>,
377 input: HistoryState,
378 ) -> Result<HistoryState, HistoryError>;
379}