Skip to main content

lash_core/plugin/
history.rs

1//! History rewriting and turn-context transform plugin contracts.
2//!
3//! Split out of `plugin/mod.rs` purely for file size. All types keep
4//! their original module path via `pub use` in `plugin/mod.rs`.
5
6use std::sync::{Arc, OnceLock};
7
8use crate::runtime::RuntimeSessionState;
9use crate::{SessionPolicy, SessionSnapshot};
10
11use super::PluginError;
12
13/// Reason the history pipeline is being invoked.
14#[derive(Clone, Debug)]
15pub enum RewriteTrigger {
16    /// User invoked `/compact` (or an equivalent plugin command).
17    Manual { instructions: Option<String> },
18    /// Session config changed to a smaller context window.
19    WindowShrink {
20        old_max: Option<usize>,
21        new_max: Option<usize>,
22    },
23    /// Reserved for future scheduled compactors — not fired by any call
24    /// site today.
25    Periodic,
26}
27
28/// Metadata accumulated as a history rewrite pipeline runs.
29#[derive(Clone, Debug, Default)]
30pub struct HistoryRewriteMetadata {
31    pub summarized_token_count: Option<u64>,
32    pub pruned_message_count: u32,
33    pub produced_summary: bool,
34}
35
36/// Mutable state passed through the history rewrite pipeline.
37#[derive(Clone, Debug)]
38pub struct HistoryState {
39    pub messages: Vec<crate::Message>,
40    pub metadata: HistoryRewriteMetadata,
41}
42
43impl HistoryState {
44    pub fn from_snapshot(snapshot: &SessionSnapshot) -> Self {
45        let read_view = snapshot.read_view();
46        Self {
47            messages: read_view.messages().to_vec(),
48            metadata: HistoryRewriteMetadata::default(),
49        }
50    }
51}
52
53#[derive(Clone, Debug)]
54pub struct SessionReadView(Arc<SessionReadState>);
55
56#[derive(Debug)]
57struct SessionReadState {
58    meta: SessionReadMeta,
59    graph: SessionReadGraph,
60    read_model: crate::session_graph::SessionReadModel,
61    chronological_projection: OnceLock<Arc<crate::ChronologicalProjection>>,
62}
63
64#[derive(Clone, Debug)]
65struct SessionReadMeta {
66    session_id: String,
67    policy: SessionPolicy,
68    turn_index: usize,
69    token_usage: crate::TokenUsage,
70    last_prompt_usage: Option<crate::runtime::PromptUsage>,
71    protocol_turn_options: crate::ProtocolTurnOptions,
72}
73
74impl SessionReadMeta {
75    fn from_snapshot_ref(snapshot: &SessionSnapshot) -> Self {
76        Self {
77            session_id: snapshot.session_id.clone(),
78            policy: snapshot.policy.clone(),
79            turn_index: snapshot.turn_index,
80            token_usage: snapshot.token_usage.clone(),
81            last_prompt_usage: snapshot.last_prompt_usage.clone(),
82            protocol_turn_options: snapshot.protocol_turn_options.clone(),
83        }
84    }
85
86    fn from_persisted_ref(state: &RuntimeSessionState) -> Self {
87        Self {
88            session_id: state.session_id.clone(),
89            policy: state.policy.clone(),
90            turn_index: state.turn_index,
91            token_usage: state.token_usage.clone(),
92            last_prompt_usage: state.last_prompt_usage.clone(),
93            protocol_turn_options: state.protocol_turn_options.clone(),
94        }
95    }
96
97    fn with_policy(mut self, policy: SessionPolicy) -> Self {
98        self.policy = policy;
99        self
100    }
101
102    fn with_turn_index(mut self, turn_index: usize) -> Self {
103        self.turn_index = turn_index;
104        self
105    }
106
107    fn with_protocol_turn_options(
108        mut self,
109        protocol_turn_options: crate::ProtocolTurnOptions,
110    ) -> Self {
111        self.protocol_turn_options = protocol_turn_options;
112        self
113    }
114
115    fn to_snapshot(&self, session_graph: crate::SessionGraph) -> SessionSnapshot {
116        SessionSnapshot {
117            session_id: self.session_id.clone(),
118            policy: self.policy.clone(),
119            agent_frames: Vec::new(),
120            current_agent_frame_id: String::new(),
121            session_graph,
122            turn_index: self.turn_index,
123            token_usage: self.token_usage.clone(),
124            last_prompt_usage: self.last_prompt_usage.clone(),
125            protocol_turn_options: self.protocol_turn_options.clone(),
126            tool_state_ref: None,
127            tool_state_generation: None,
128            plugin_snapshot_ref: None,
129            plugin_snapshot_revision: None,
130            execution_state_ref: None,
131            token_ledger: Vec::new(),
132            checkpoint_ref: None,
133        }
134    }
135}
136
137#[derive(Debug)]
138enum SessionReadGraph {
139    Owned(crate::SessionGraph),
140    Derived {
141        cache: OnceLock<crate::SessionGraph>,
142        base_graph: Arc<crate::SessionGraph>,
143    },
144}
145
146impl SessionReadView {
147    fn from_graph_message_sequence_meta(
148        meta: SessionReadMeta,
149        base_graph: Arc<crate::SessionGraph>,
150        messages: crate::MessageSequence,
151        active_events: Arc<Vec<crate::SessionEventRecord>>,
152    ) -> Self {
153        Self(Arc::new(SessionReadState {
154            meta,
155            graph: SessionReadGraph::Derived {
156                cache: OnceLock::new(),
157                base_graph,
158            },
159            read_model: crate::session_graph::SessionReadModel {
160                active_events,
161                messages: messages.shared(),
162                prompt_render_cache: Arc::new(crate::BaseRenderCache::new()),
163            },
164            chronological_projection: OnceLock::new(),
165        }))
166    }
167
168    pub fn from_snapshot(snapshot: &SessionSnapshot) -> Self {
169        let read_model = snapshot.read_model();
170        Self(Arc::new(SessionReadState {
171            meta: SessionReadMeta::from_snapshot_ref(snapshot),
172            graph: SessionReadGraph::Owned(snapshot.session_graph.clone()),
173            read_model,
174            chronological_projection: OnceLock::new(),
175        }))
176    }
177
178    pub fn from_persisted_state(state: &RuntimeSessionState) -> Self {
179        let graph = state.session_graph.clone();
180        let read_model = state.read_model();
181        Self(Arc::new(SessionReadState {
182            meta: SessionReadMeta::from_persisted_ref(state),
183            graph: SessionReadGraph::Owned(graph),
184            read_model,
185            chronological_projection: OnceLock::new(),
186        }))
187    }
188
189    pub(crate) fn from_runtime_state(
190        state: &RuntimeSessionState,
191        policy: SessionPolicy,
192        protocol_turn_options: crate::ProtocolTurnOptions,
193    ) -> Self {
194        let graph = state.session_graph.clone();
195        let read_model = state.read_model();
196        Self(Arc::new(SessionReadState {
197            meta: SessionReadMeta::from_persisted_ref(state)
198                .with_policy(policy)
199                .with_protocol_turn_options(protocol_turn_options),
200            graph: SessionReadGraph::Owned(graph),
201            read_model,
202            chronological_projection: OnceLock::new(),
203        }))
204    }
205
206    pub(crate) fn derived_from_persisted_state(
207        state: &RuntimeSessionState,
208        policy: SessionPolicy,
209        turn_index: usize,
210        protocol_turn_options: crate::ProtocolTurnOptions,
211        base_graph: Arc<crate::SessionGraph>,
212        messages: crate::MessageSequence,
213    ) -> Self {
214        let active_events = base_graph
215            .read_model_for_agent_frame(
216                &state.current_agent_frame_id,
217                state
218                    .current_agent_frame()
219                    .map(|frame| frame.previous_frame_id.is_none())
220                    .unwrap_or(true),
221            )
222            .active_events;
223        Self::from_graph_message_sequence_meta(
224            SessionReadMeta::from_persisted_ref(state)
225                .with_policy(policy)
226                .with_turn_index(turn_index)
227                .with_protocol_turn_options(protocol_turn_options),
228            base_graph,
229            messages,
230            active_events,
231        )
232    }
233
234    pub fn session_graph(&self) -> &crate::SessionGraph {
235        match &self.0.graph {
236            SessionReadGraph::Owned(graph) => graph,
237            SessionReadGraph::Derived { cache, base_graph } => cache.get_or_init(|| {
238                let mut graph = (**base_graph).clone();
239                graph.replace_active_read_state(self.0.read_model.messages.as_slice());
240                graph
241            }),
242        }
243    }
244
245    pub fn session_id(&self) -> &str {
246        &self.0.meta.session_id
247    }
248
249    pub fn policy(&self) -> &SessionPolicy {
250        &self.0.meta.policy
251    }
252
253    pub fn materialized_session_graph(&self) -> crate::SessionGraph {
254        self.session_graph().clone()
255    }
256
257    pub fn messages(&self) -> &[crate::Message] {
258        self.0.read_model.messages.as_slice()
259    }
260
261    pub fn active_events(&self) -> &[crate::SessionEventRecord] {
262        self.0.read_model.active_events.as_slice()
263    }
264
265    pub fn chronological_projection(&self) -> crate::ChronologicalProjection {
266        crate::ChronologicalProjection::from_read_model(&self.0.read_model)
267    }
268
269    pub(crate) fn shared_chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
270        Arc::clone(self.0.chronological_projection.get_or_init(|| {
271            Arc::new(crate::ChronologicalProjection::from_read_model(
272                &self.0.read_model,
273            ))
274        }))
275    }
276
277    pub fn message_tree(&self) -> Vec<crate::SessionMessageTreeNode> {
278        self.session_graph().message_tree()
279    }
280
281    pub fn turn_index(&self) -> usize {
282        self.0.meta.turn_index
283    }
284
285    pub fn token_usage(&self) -> &crate::TokenUsage {
286        &self.0.meta.token_usage
287    }
288
289    pub fn last_prompt_usage(&self) -> Option<&crate::runtime::PromptUsage> {
290        self.0.meta.last_prompt_usage.as_ref()
291    }
292
293    pub fn protocol_turn_options(&self) -> &crate::ProtocolTurnOptions {
294        &self.0.meta.protocol_turn_options
295    }
296
297    pub fn to_snapshot(&self) -> SessionSnapshot {
298        self.0.meta.to_snapshot(self.session_graph().clone())
299    }
300}
301
302/// Context passed to a turn-context transform.
303#[derive(Clone)]
304pub struct TurnTransformContext<'run> {
305    pub session_id: String,
306    pub state: SessionReadView,
307    pub prompt_usage: Option<crate::runtime::PromptUsage>,
308    pub max_context_tokens: Option<usize>,
309    pub sessions: Arc<dyn super::SessionStateService>,
310    pub session_lifecycle: Arc<dyn super::SessionLifecycleService>,
311    pub session_graph: Arc<dyn super::SessionGraphService>,
312    pub scoped_effect_controller: crate::ScopedEffectController<'run>,
313    pub direct_completions: crate::DirectCompletionClient<'run>,
314}
315
316/// Context passed to a history rewriter.
317#[derive(Clone)]
318pub struct RewriteContext<'run> {
319    pub session_id: String,
320    pub trigger: RewriteTrigger,
321    pub state: SessionReadView,
322    pub sessions: Arc<dyn super::SessionStateService>,
323    pub session_lifecycle: Arc<dyn super::SessionLifecycleService>,
324    pub session_graph: Arc<dyn super::SessionGraphService>,
325    pub scoped_effect_controller: crate::ScopedEffectController<'run>,
326}
327
328#[derive(Debug, thiserror::Error, Clone)]
329pub enum HistoryError {
330    #[error("history pipeline error: {0}")]
331    Pipeline(String),
332    #[error("history session error: {0}")]
333    Session(String),
334}
335
336impl From<PluginError> for HistoryError {
337    fn from(value: PluginError) -> Self {
338        Self::Session(value.to_string())
339    }
340}
341
342/// Prepares the ephemeral turn context presented to the model.
343#[async_trait::async_trait]
344pub trait TurnContextTransform: Send + Sync {
345    fn id(&self) -> &'static str;
346    async fn transform(
347        &self,
348        ctx: &TurnTransformContext<'_>,
349        input: crate::session_model::context::PreparedContext,
350    ) -> Result<crate::session_model::context::PreparedContext, HistoryError>;
351}
352
353/// Performs a permanent transform on persisted history (compaction,
354/// overflow recovery, manual `/compact`, ...).
355#[async_trait::async_trait]
356pub trait HistoryRewriter: Send + Sync {
357    fn id(&self) -> &'static str;
358    fn accepts(&self, _trigger: &RewriteTrigger) -> bool {
359        true
360    }
361    async fn rewrite(
362        &self,
363        ctx: &RewriteContext<'_>,
364        input: HistoryState,
365    ) -> Result<HistoryState, HistoryError>;
366}