1use std::sync::{Arc, OnceLock};
7
8use crate::runtime::RuntimeSessionState;
9use crate::{SessionPolicy, SessionSnapshot};
10
11use super::PluginError;
12
13#[derive(Clone, Debug)]
15pub enum RewriteTrigger {
16 Manual { instructions: Option<String> },
18 WindowShrink {
20 old_max: Option<usize>,
21 new_max: Option<usize>,
22 },
23 Periodic,
26}
27
28#[derive(Clone, Debug, Default)]
30pub struct HistoryRewriteMetadata {
31 pub summarized_token_count: Option<u64>,
32 pub pruned_message_count: u32,
33 pub produced_summary: bool,
34}
35
36#[derive(Clone, Debug)]
38pub struct HistoryState {
39 pub messages: Vec<crate::Message>,
40 pub metadata: HistoryRewriteMetadata,
41}
42
43impl HistoryState {
44 pub fn from_snapshot(snapshot: &SessionSnapshot) -> Self {
45 let read_view = snapshot.read_view();
46 Self {
47 messages: read_view.messages().to_vec(),
48 metadata: HistoryRewriteMetadata::default(),
49 }
50 }
51}
52
53#[derive(Clone, Debug)]
54pub struct SessionReadView(Arc<SessionReadState>);
55
56#[derive(Debug)]
57struct SessionReadState {
58 meta: SessionReadMeta,
59 graph: SessionReadGraph,
60 read_model: crate::session_graph::SessionReadModel,
61 chronological_projection: OnceLock<Arc<crate::ChronologicalProjection>>,
62}
63
64#[derive(Clone, Debug)]
65struct SessionReadMeta {
66 session_id: String,
67 policy: SessionPolicy,
68 turn_index: usize,
69 token_usage: crate::TokenUsage,
70 last_prompt_usage: Option<crate::runtime::PromptUsage>,
71 protocol_turn_options: crate::ProtocolTurnOptions,
72}
73
74impl SessionReadMeta {
75 fn from_snapshot_ref(snapshot: &SessionSnapshot) -> Self {
76 Self {
77 session_id: snapshot.session_id.clone(),
78 policy: snapshot.policy.clone(),
79 turn_index: snapshot.turn_index,
80 token_usage: snapshot.token_usage.clone(),
81 last_prompt_usage: snapshot.last_prompt_usage.clone(),
82 protocol_turn_options: snapshot.protocol_turn_options.clone(),
83 }
84 }
85
86 fn from_persisted_ref(state: &RuntimeSessionState) -> Self {
87 Self {
88 session_id: state.session_id.clone(),
89 policy: state.policy.clone(),
90 turn_index: state.turn_index,
91 token_usage: state.token_usage.clone(),
92 last_prompt_usage: state.last_prompt_usage.clone(),
93 protocol_turn_options: state.protocol_turn_options.clone(),
94 }
95 }
96
97 fn with_policy(mut self, policy: SessionPolicy) -> Self {
98 self.policy = policy;
99 self
100 }
101
102 fn with_turn_index(mut self, turn_index: usize) -> Self {
103 self.turn_index = turn_index;
104 self
105 }
106
107 fn with_protocol_turn_options(
108 mut self,
109 protocol_turn_options: crate::ProtocolTurnOptions,
110 ) -> Self {
111 self.protocol_turn_options = protocol_turn_options;
112 self
113 }
114
115 fn to_snapshot(&self, session_graph: crate::SessionGraph) -> SessionSnapshot {
116 SessionSnapshot {
117 session_id: self.session_id.clone(),
118 policy: self.policy.clone(),
119 agent_frames: Vec::new(),
120 current_agent_frame_id: String::new(),
121 session_graph,
122 turn_index: self.turn_index,
123 token_usage: self.token_usage.clone(),
124 last_prompt_usage: self.last_prompt_usage.clone(),
125 protocol_turn_options: self.protocol_turn_options.clone(),
126 tool_state_ref: None,
127 tool_state_generation: None,
128 plugin_snapshot_ref: None,
129 plugin_snapshot_revision: None,
130 execution_state_ref: None,
131 token_ledger: Vec::new(),
132 checkpoint_ref: None,
133 }
134 }
135}
136
137#[derive(Debug)]
138enum SessionReadGraph {
139 Owned(crate::SessionGraph),
140 Derived {
141 cache: OnceLock<crate::SessionGraph>,
142 base_graph: Arc<crate::SessionGraph>,
143 },
144}
145
146impl SessionReadView {
147 fn from_graph_message_sequence_meta(
148 meta: SessionReadMeta,
149 base_graph: Arc<crate::SessionGraph>,
150 messages: crate::MessageSequence,
151 active_events: Arc<Vec<crate::SessionEventRecord>>,
152 ) -> Self {
153 Self(Arc::new(SessionReadState {
154 meta,
155 graph: SessionReadGraph::Derived {
156 cache: OnceLock::new(),
157 base_graph,
158 },
159 read_model: crate::session_graph::SessionReadModel {
160 active_events,
161 messages: messages.shared(),
162 prompt_render_cache: Arc::new(crate::BaseRenderCache::new()),
163 },
164 chronological_projection: OnceLock::new(),
165 }))
166 }
167
168 pub fn from_snapshot(snapshot: &SessionSnapshot) -> Self {
169 let read_model = snapshot.read_model();
170 Self(Arc::new(SessionReadState {
171 meta: SessionReadMeta::from_snapshot_ref(snapshot),
172 graph: SessionReadGraph::Owned(snapshot.session_graph.clone()),
173 read_model,
174 chronological_projection: OnceLock::new(),
175 }))
176 }
177
178 pub fn from_persisted_state(state: &RuntimeSessionState) -> Self {
179 let graph = state.session_graph.clone();
180 let read_model = state.read_model();
181 Self(Arc::new(SessionReadState {
182 meta: SessionReadMeta::from_persisted_ref(state),
183 graph: SessionReadGraph::Owned(graph),
184 read_model,
185 chronological_projection: OnceLock::new(),
186 }))
187 }
188
189 pub(crate) fn from_runtime_state(
190 state: &RuntimeSessionState,
191 policy: SessionPolicy,
192 protocol_turn_options: crate::ProtocolTurnOptions,
193 ) -> Self {
194 let graph = state.session_graph.clone();
195 let read_model = state.read_model();
196 Self(Arc::new(SessionReadState {
197 meta: SessionReadMeta::from_persisted_ref(state)
198 .with_policy(policy)
199 .with_protocol_turn_options(protocol_turn_options),
200 graph: SessionReadGraph::Owned(graph),
201 read_model,
202 chronological_projection: OnceLock::new(),
203 }))
204 }
205
206 pub(crate) fn derived_from_persisted_state(
207 state: &RuntimeSessionState,
208 policy: SessionPolicy,
209 turn_index: usize,
210 protocol_turn_options: crate::ProtocolTurnOptions,
211 base_graph: Arc<crate::SessionGraph>,
212 messages: crate::MessageSequence,
213 ) -> Self {
214 let active_events = base_graph
215 .read_model_for_agent_frame(
216 &state.current_agent_frame_id,
217 state
218 .current_agent_frame()
219 .map(|frame| frame.previous_frame_id.is_none())
220 .unwrap_or(true),
221 )
222 .active_events;
223 Self::from_graph_message_sequence_meta(
224 SessionReadMeta::from_persisted_ref(state)
225 .with_policy(policy)
226 .with_turn_index(turn_index)
227 .with_protocol_turn_options(protocol_turn_options),
228 base_graph,
229 messages,
230 active_events,
231 )
232 }
233
234 pub fn session_graph(&self) -> &crate::SessionGraph {
235 match &self.0.graph {
236 SessionReadGraph::Owned(graph) => graph,
237 SessionReadGraph::Derived { cache, base_graph } => cache.get_or_init(|| {
238 let mut graph = (**base_graph).clone();
239 graph.replace_active_read_state(self.0.read_model.messages.as_slice());
240 graph
241 }),
242 }
243 }
244
245 pub fn session_id(&self) -> &str {
246 &self.0.meta.session_id
247 }
248
249 pub fn policy(&self) -> &SessionPolicy {
250 &self.0.meta.policy
251 }
252
253 pub fn materialized_session_graph(&self) -> crate::SessionGraph {
254 self.session_graph().clone()
255 }
256
257 pub fn messages(&self) -> &[crate::Message] {
258 self.0.read_model.messages.as_slice()
259 }
260
261 pub fn active_events(&self) -> &[crate::SessionEventRecord] {
262 self.0.read_model.active_events.as_slice()
263 }
264
265 pub fn chronological_projection(&self) -> crate::ChronologicalProjection {
266 crate::ChronologicalProjection::from_read_model(&self.0.read_model)
267 }
268
269 pub(crate) fn shared_chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
270 Arc::clone(self.0.chronological_projection.get_or_init(|| {
271 Arc::new(crate::ChronologicalProjection::from_read_model(
272 &self.0.read_model,
273 ))
274 }))
275 }
276
277 pub fn message_tree(&self) -> Vec<crate::SessionMessageTreeNode> {
278 self.session_graph().message_tree()
279 }
280
281 pub fn turn_index(&self) -> usize {
282 self.0.meta.turn_index
283 }
284
285 pub fn token_usage(&self) -> &crate::TokenUsage {
286 &self.0.meta.token_usage
287 }
288
289 pub fn last_prompt_usage(&self) -> Option<&crate::runtime::PromptUsage> {
290 self.0.meta.last_prompt_usage.as_ref()
291 }
292
293 pub fn protocol_turn_options(&self) -> &crate::ProtocolTurnOptions {
294 &self.0.meta.protocol_turn_options
295 }
296
297 pub fn to_snapshot(&self) -> SessionSnapshot {
298 self.0.meta.to_snapshot(self.session_graph().clone())
299 }
300}
301
302#[derive(Clone)]
304pub struct TurnTransformContext<'run> {
305 pub session_id: String,
306 pub state: SessionReadView,
307 pub prompt_usage: Option<crate::runtime::PromptUsage>,
308 pub max_context_tokens: Option<usize>,
309 pub sessions: Arc<dyn super::SessionStateService>,
310 pub session_lifecycle: Arc<dyn super::SessionLifecycleService>,
311 pub session_graph: Arc<dyn super::SessionGraphService>,
312 pub scoped_effect_controller: crate::ScopedEffectController<'run>,
313 pub direct_completions: crate::DirectCompletionClient<'run>,
314}
315
316#[derive(Clone)]
318pub struct RewriteContext<'run> {
319 pub session_id: String,
320 pub trigger: RewriteTrigger,
321 pub state: SessionReadView,
322 pub sessions: Arc<dyn super::SessionStateService>,
323 pub session_lifecycle: Arc<dyn super::SessionLifecycleService>,
324 pub session_graph: Arc<dyn super::SessionGraphService>,
325 pub scoped_effect_controller: crate::ScopedEffectController<'run>,
326}
327
328#[derive(Debug, thiserror::Error, Clone)]
329pub enum HistoryError {
330 #[error("history pipeline error: {0}")]
331 Pipeline(String),
332 #[error("history session error: {0}")]
333 Session(String),
334}
335
336impl From<PluginError> for HistoryError {
337 fn from(value: PluginError) -> Self {
338 Self::Session(value.to_string())
339 }
340}
341
342#[async_trait::async_trait]
344pub trait TurnContextTransform: Send + Sync {
345 fn id(&self) -> &'static str;
346 async fn transform(
347 &self,
348 ctx: &TurnTransformContext<'_>,
349 input: crate::session_model::context::PreparedContext,
350 ) -> Result<crate::session_model::context::PreparedContext, HistoryError>;
351}
352
353#[async_trait::async_trait]
356pub trait HistoryRewriter: Send + Sync {
357 fn id(&self) -> &'static str;
358 fn accepts(&self, _trigger: &RewriteTrigger) -> bool {
359 true
360 }
361 async fn rewrite(
362 &self,
363 ctx: &RewriteContext<'_>,
364 input: HistoryState,
365 ) -> Result<HistoryState, HistoryError>;
366}