Skip to main content

lash_core/plugin/
history.rs

1//! History rewriting and turn-context transform plugin contracts.
2//!
3//! Split out of `plugin/mod.rs` purely for file size. All types keep
4//! their original module path via `pub use` in `plugin/mod.rs`.
5
6use std::sync::{Arc, OnceLock};
7
8use crate::runtime::RuntimeSessionState;
9use crate::{SessionPolicy, SessionSnapshot};
10
11use super::PluginError;
12
13/// Reason the history pipeline is being invoked.
14#[derive(Clone, Debug)]
15pub enum RewriteTrigger {
16    /// User invoked `/compact` (or an equivalent plugin command).
17    Manual { instructions: Option<String> },
18    /// Session config changed to a smaller context window.
19    WindowShrink {
20        old_max: Option<usize>,
21        new_max: Option<usize>,
22    },
23    /// Reserved for future scheduled compactors — not fired by any call
24    /// site today.
25    Periodic,
26}
27
28/// Metadata accumulated as a history rewrite pipeline runs.
29#[derive(Clone, Debug, Default)]
30pub struct HistoryRewriteMetadata {
31    pub summarized_token_count: Option<u64>,
32    pub pruned_message_count: u32,
33    pub produced_summary: bool,
34}
35
36/// Mutable state passed through the history rewrite pipeline.
37#[derive(Clone, Debug)]
38pub struct HistoryState {
39    pub messages: Vec<crate::Message>,
40    pub tool_calls: Vec<crate::ToolCallRecord>,
41    pub metadata: HistoryRewriteMetadata,
42}
43
44impl HistoryState {
45    pub fn from_snapshot(snapshot: &SessionSnapshot) -> Self {
46        let read_view = snapshot.read_view();
47        Self {
48            messages: read_view.messages().to_vec(),
49            tool_calls: read_view.tool_calls().to_vec(),
50            metadata: HistoryRewriteMetadata::default(),
51        }
52    }
53}
54
55#[derive(Clone, Debug)]
56pub struct SessionReadView(Arc<SessionReadState>);
57
58#[derive(Debug)]
59struct SessionReadState {
60    meta: SessionReadMeta,
61    graph: SessionReadGraph,
62    read_model: crate::session_graph::SessionReadModel,
63    chronological_projection: OnceLock<Arc<crate::ChronologicalProjection>>,
64}
65
66#[derive(Clone, Debug)]
67struct SessionReadMeta {
68    session_id: String,
69    policy: SessionPolicy,
70    turn_index: usize,
71    token_usage: crate::TokenUsage,
72    last_prompt_usage: Option<crate::runtime::PromptUsage>,
73    protocol_turn_options: crate::ProtocolTurnOptions,
74}
75
76impl SessionReadMeta {
77    fn from_snapshot_ref(snapshot: &SessionSnapshot) -> Self {
78        Self {
79            session_id: snapshot.session_id.clone(),
80            policy: snapshot.policy.clone(),
81            turn_index: snapshot.turn_index,
82            token_usage: snapshot.token_usage.clone(),
83            last_prompt_usage: snapshot.last_prompt_usage.clone(),
84            protocol_turn_options: snapshot.protocol_turn_options.clone(),
85        }
86    }
87
88    fn from_persisted_ref(state: &RuntimeSessionState) -> Self {
89        Self {
90            session_id: state.session_id.clone(),
91            policy: state.policy.clone(),
92            turn_index: state.turn_index,
93            token_usage: state.token_usage.clone(),
94            last_prompt_usage: state.last_prompt_usage.clone(),
95            protocol_turn_options: state.protocol_turn_options.clone(),
96        }
97    }
98
99    fn with_policy(mut self, policy: SessionPolicy) -> Self {
100        self.policy = policy;
101        self
102    }
103
104    fn with_turn_index(mut self, turn_index: usize) -> Self {
105        self.turn_index = turn_index;
106        self
107    }
108
109    fn with_protocol_turn_options(
110        mut self,
111        protocol_turn_options: crate::ProtocolTurnOptions,
112    ) -> Self {
113        self.protocol_turn_options = protocol_turn_options;
114        self
115    }
116
117    fn to_snapshot(&self, session_graph: crate::SessionGraph) -> SessionSnapshot {
118        SessionSnapshot {
119            session_id: self.session_id.clone(),
120            policy: self.policy.clone(),
121            agent_frames: Vec::new(),
122            current_agent_frame_id: String::new(),
123            session_graph,
124            turn_index: self.turn_index,
125            token_usage: self.token_usage.clone(),
126            last_prompt_usage: self.last_prompt_usage.clone(),
127            protocol_turn_options: self.protocol_turn_options.clone(),
128            tool_state_ref: None,
129            tool_state_generation: None,
130            plugin_snapshot_ref: None,
131            plugin_snapshot_revision: None,
132            execution_state_ref: None,
133            token_ledger: Vec::new(),
134            checkpoint_ref: None,
135        }
136    }
137}
138
139#[derive(Debug)]
140enum SessionReadGraph {
141    Owned(crate::SessionGraph),
142    Derived {
143        cache: OnceLock<crate::SessionGraph>,
144        base_graph: Arc<crate::SessionGraph>,
145    },
146}
147
148impl SessionReadView {
149    fn from_graph_message_sequence_meta(
150        meta: SessionReadMeta,
151        base_graph: Arc<crate::SessionGraph>,
152        messages: crate::MessageSequence,
153        tool_calls: Arc<Vec<crate::ToolCallRecord>>,
154        active_events: Arc<Vec<crate::SessionEventRecord>>,
155    ) -> Self {
156        Self(Arc::new(SessionReadState {
157            meta,
158            graph: SessionReadGraph::Derived {
159                cache: OnceLock::new(),
160                base_graph,
161            },
162            read_model: crate::session_graph::SessionReadModel {
163                active_events,
164                messages: messages.shared(),
165                tool_calls,
166                prompt_render_cache: Arc::new(crate::BaseRenderCache::new()),
167            },
168            chronological_projection: OnceLock::new(),
169        }))
170    }
171
172    pub fn from_snapshot(snapshot: &SessionSnapshot) -> Self {
173        let read_model = snapshot.read_model();
174        Self(Arc::new(SessionReadState {
175            meta: SessionReadMeta::from_snapshot_ref(snapshot),
176            graph: SessionReadGraph::Owned(snapshot.session_graph.clone()),
177            read_model,
178            chronological_projection: OnceLock::new(),
179        }))
180    }
181
182    pub fn from_persisted_state(state: &RuntimeSessionState) -> Self {
183        let graph = state.session_graph.clone();
184        let read_model = state.read_model();
185        Self(Arc::new(SessionReadState {
186            meta: SessionReadMeta::from_persisted_ref(state),
187            graph: SessionReadGraph::Owned(graph),
188            read_model,
189            chronological_projection: OnceLock::new(),
190        }))
191    }
192
193    pub(crate) fn from_runtime_state(
194        state: &RuntimeSessionState,
195        policy: SessionPolicy,
196        protocol_turn_options: crate::ProtocolTurnOptions,
197    ) -> Self {
198        let graph = state.session_graph.clone();
199        let read_model = state.read_model();
200        Self(Arc::new(SessionReadState {
201            meta: SessionReadMeta::from_persisted_ref(state)
202                .with_policy(policy)
203                .with_protocol_turn_options(protocol_turn_options),
204            graph: SessionReadGraph::Owned(graph),
205            read_model,
206            chronological_projection: OnceLock::new(),
207        }))
208    }
209
210    pub(crate) fn derived_from_persisted_state(
211        state: &RuntimeSessionState,
212        policy: SessionPolicy,
213        turn_index: usize,
214        protocol_turn_options: crate::ProtocolTurnOptions,
215        base_graph: Arc<crate::SessionGraph>,
216        messages: crate::MessageSequence,
217        tool_calls: Arc<Vec<crate::ToolCallRecord>>,
218    ) -> Self {
219        let active_events = base_graph
220            .read_model_for_agent_frame(
221                &state.current_agent_frame_id,
222                state
223                    .current_agent_frame()
224                    .map(|frame| frame.previous_frame_id.is_none())
225                    .unwrap_or(true),
226            )
227            .active_events;
228        Self::from_graph_message_sequence_meta(
229            SessionReadMeta::from_persisted_ref(state)
230                .with_policy(policy)
231                .with_turn_index(turn_index)
232                .with_protocol_turn_options(protocol_turn_options),
233            base_graph,
234            messages,
235            tool_calls,
236            active_events,
237        )
238    }
239
240    pub fn session_graph(&self) -> &crate::SessionGraph {
241        match &self.0.graph {
242            SessionReadGraph::Owned(graph) => graph,
243            SessionReadGraph::Derived { cache, base_graph } => cache.get_or_init(|| {
244                let mut graph = (**base_graph).clone();
245                graph.replace_active_read_state(
246                    self.0.read_model.messages.as_slice(),
247                    self.0.read_model.tool_calls.as_slice(),
248                );
249                graph
250            }),
251        }
252    }
253
254    pub fn session_id(&self) -> &str {
255        &self.0.meta.session_id
256    }
257
258    pub fn policy(&self) -> &SessionPolicy {
259        &self.0.meta.policy
260    }
261
262    pub fn materialized_session_graph(&self) -> crate::SessionGraph {
263        self.session_graph().clone()
264    }
265
266    pub fn messages(&self) -> &[crate::Message] {
267        self.0.read_model.messages.as_slice()
268    }
269
270    pub fn tool_calls(&self) -> &[crate::ToolCallRecord] {
271        self.0.read_model.tool_calls.as_slice()
272    }
273
274    pub fn active_events(&self) -> &[crate::SessionEventRecord] {
275        self.0.read_model.active_events.as_slice()
276    }
277
278    pub fn chronological_projection(&self) -> crate::ChronologicalProjection {
279        crate::ChronologicalProjection::from_read_model(&self.0.read_model)
280    }
281
282    pub(crate) fn shared_chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
283        Arc::clone(self.0.chronological_projection.get_or_init(|| {
284            Arc::new(crate::ChronologicalProjection::from_read_model(
285                &self.0.read_model,
286            ))
287        }))
288    }
289
290    pub fn message_tree(&self) -> Vec<crate::SessionMessageTreeNode> {
291        self.session_graph().message_tree()
292    }
293
294    pub fn turn_index(&self) -> usize {
295        self.0.meta.turn_index
296    }
297
298    pub fn token_usage(&self) -> &crate::TokenUsage {
299        &self.0.meta.token_usage
300    }
301
302    pub fn last_prompt_usage(&self) -> Option<&crate::runtime::PromptUsage> {
303        self.0.meta.last_prompt_usage.as_ref()
304    }
305
306    pub fn protocol_turn_options(&self) -> &crate::ProtocolTurnOptions {
307        &self.0.meta.protocol_turn_options
308    }
309
310    pub fn to_snapshot(&self) -> SessionSnapshot {
311        self.0.meta.to_snapshot(self.session_graph().clone())
312    }
313}
314
315/// Context passed to a turn-context transform.
316#[derive(Clone)]
317pub struct TurnTransformContext<'run> {
318    pub session_id: String,
319    pub state: SessionReadView,
320    pub prompt_usage: Option<crate::runtime::PromptUsage>,
321    pub max_context_tokens: Option<usize>,
322    pub sessions: Arc<dyn super::SessionStateService>,
323    pub session_lifecycle: Arc<dyn super::SessionLifecycleService>,
324    pub session_graph: Arc<dyn super::SessionGraphService>,
325    pub scoped_effect_controller: crate::ScopedEffectController<'run>,
326    pub direct_completions: crate::DirectCompletionClient<'run>,
327}
328
329/// Context passed to a history rewriter.
330#[derive(Clone)]
331pub struct RewriteContext<'run> {
332    pub session_id: String,
333    pub trigger: RewriteTrigger,
334    pub state: SessionReadView,
335    pub sessions: Arc<dyn super::SessionStateService>,
336    pub session_lifecycle: Arc<dyn super::SessionLifecycleService>,
337    pub session_graph: Arc<dyn super::SessionGraphService>,
338    pub scoped_effect_controller: crate::ScopedEffectController<'run>,
339}
340
341#[derive(Debug, thiserror::Error, Clone)]
342pub enum HistoryError {
343    #[error("history pipeline error: {0}")]
344    Pipeline(String),
345    #[error("history session error: {0}")]
346    Session(String),
347}
348
349impl From<PluginError> for HistoryError {
350    fn from(value: PluginError) -> Self {
351        Self::Session(value.to_string())
352    }
353}
354
355/// Prepares the ephemeral turn context presented to the model.
356#[async_trait::async_trait]
357pub trait TurnContextTransform: Send + Sync {
358    fn id(&self) -> &'static str;
359    async fn transform(
360        &self,
361        ctx: &TurnTransformContext<'_>,
362        input: crate::session_model::context::PreparedContext,
363    ) -> Result<crate::session_model::context::PreparedContext, HistoryError>;
364}
365
366/// Performs a permanent transform on persisted history (compaction,
367/// overflow recovery, manual `/compact`, ...).
368#[async_trait::async_trait]
369pub trait HistoryRewriter: Send + Sync {
370    fn id(&self) -> &'static str;
371    fn accepts(&self, _trigger: &RewriteTrigger) -> bool {
372        true
373    }
374    async fn rewrite(
375        &self,
376        ctx: &RewriteContext<'_>,
377        input: HistoryState,
378    ) -> Result<HistoryState, HistoryError>;
379}