Skip to main content

git_internal/internal/object/
pipeline.rs

1//! Dynamic Context Pipeline
2//!
3//! A [`ContextPipeline`] solves the context-forgetting problem in
4//! long-running AI tasks. Instead of relying solely on a static
5//! [`ContextSnapshot`](super::context::ContextSnapshot) captured at
6//! Run start, a ContextPipeline accumulates incremental
7//! [`ContextFrame`]s throughout the workflow.
8//!
9//! # Position in Lifecycle
10//!
11//! ```text
12//!  ②  Intent (Active)         ← content analyzed
13//!       │
14//!       ▼
15//!      ContextPipeline created ← seeded with IntentAnalysis frame
16//!       │
17//!       ▼
18//!  ③  Plan (Plan.pipeline → Pipeline, Plan.fwindow = visible range)
19//!       │  steps execute
20//!       ▼
21//!      Frames accumulate       ← StepSummary, CodeChange, ToolCall, ...
22//!       │
23//!       ▼
24//!      Replan? → new Plan with updated fwindow
25//! ```
26//!
27//! The pipeline is created *after* an Intent's content is analyzed
28//! (step ②) but *before* a Plan exists. The initial
29//! [`IntentAnalysis`](FrameKind::IntentAnalysis) frame captures the
30//! AI's structured interpretation, which serves as the foundation
31//! for Plan creation. The [`Plan`](super::plan::Plan) then references
32//! this pipeline via `pipeline` and records the visible frame range
33//! via `fwindow`. During execution, frames accumulate to track
34//! step-by-step progress.
35//!
36//! # Relationship to Other Objects
37//!
38//! ```text
39//! Intent ──plan──→ Plan ──pipeline──→ ContextPipeline
40//!                   │                        │
41//!              [PlanStep₀, ...]   [IntentAnalysis, StepSummary, ...]
42//!                   │                        ▲
43//!              iframes/oframes ──────────────┘
44//! ```
45//!
46//! | From | Field | To | Notes |
47//! |------|-------|----|-------|
48//! | Plan | `pipeline` | ContextPipeline | 0..1 |
49//! | PlanStep | `iframes` | ContextFrame IDs | consumed context |
50//! | PlanStep | `oframes` | ContextFrame IDs | produced context |
51//!
52//! The pipeline itself has no back-references — it is a passive
53//! container. [`PlanStep`](super::plan::PlanStep)s own the
54//! association via `iframes` and `oframes`.
55//!
56//! # Eviction
57//!
58//! When `max_frames > 0` and the limit is exceeded, the oldest
59//! evictable frame is removed. `IntentAnalysis` and `Checkpoint`
60//! frames are **protected** from eviction — they always survive.
61//!
62//! # Purpose
63//!
64//! - **Context Continuity**: Maintains a rolling window of high-value
65//!   context for the agent's working memory across Plan steps.
66//! - **Incremental Updates**: Unlike the static ContextSnapshot, the
67//!   pipeline grows as work progresses, capturing step summaries,
68//!   code changes, and tool results.
69//! - **Bounded Memory**: `max_frames` + eviction ensures the pipeline
70//!   doesn't grow unboundedly in long-running workflows.
71//! - **Replan Support**: When replanning occurs, a new Plan can
72//!   reference the same pipeline with an updated `fwindow` that
73//!   includes frames accumulated since the previous plan.
74
75use std::fmt;
76
77use chrono::{DateTime, Utc};
78use serde::{Deserialize, Serialize};
79
80use crate::{
81    errors::GitError,
82    hash::ObjectHash,
83    internal::object::{
84        ObjectTrait,
85        types::{ActorRef, Header, ObjectType},
86    },
87};
88
89/// The kind of context captured in a [`ContextFrame`].
90///
91/// Determines how the frame's `summary` and `data` should be
92/// interpreted. `IntentAnalysis` and `Checkpoint` are protected
93/// from eviction when `max_frames` is exceeded.
94#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
95#[serde(rename_all = "snake_case")]
96pub enum FrameKind {
97    /// Initial context derived from an Intent's analyzed content.
98    ///
99    /// Created when the AI fills in the `content` field on an Intent,
100    /// serving as the foundation for subsequent Plan creation. This
101    /// is the **seed frame** — always the first frame in a pipeline.
102    /// **Protected from eviction.**
103    IntentAnalysis,
104    /// Summary produced after a [`PlanStep`](super::plan::PlanStep)
105    /// completes. Captures what the step accomplished so that
106    /// subsequent steps have context.
107    StepSummary,
108    /// Code change digest (e.g. files modified, diff stats).
109    /// Typically produced alongside a
110    /// [`PatchSet`](super::patchset::PatchSet).
111    CodeChange,
112    /// System or environment state snapshot (e.g. memory usage,
113    /// disk space, running services).
114    SystemState,
115    /// Context captured during error recovery. Records what went
116    /// wrong and what corrective action was taken, so that subsequent
117    /// steps don't repeat the same mistakes.
118    ErrorRecovery,
119    /// Explicit save-point created by user or system.
120    /// **Protected from eviction.** Used for long-running workflows
121    /// where the agent may be paused and resumed.
122    Checkpoint,
123    /// Result of an external tool invocation (MCP service, function
124    /// call, REST API, CLI command, etc.).
125    ///
126    /// Intentionally protocol-agnostic: MCP is one transport for
127    /// tool calls, but agents may also invoke tools via direct
128    /// function calls, HTTP APIs, or shell commands. Protocol-specific
129    /// details (server name, tool name, arguments, result preview)
130    /// belong in `ContextFrame.data`.
131    ToolCall,
132    /// Application-defined context type not covered by the variants
133    /// above.
134    Other(String),
135}
136
137impl FrameKind {
138    pub fn as_str(&self) -> &str {
139        match self {
140            FrameKind::IntentAnalysis => "intent_analysis",
141            FrameKind::StepSummary => "step_summary",
142            FrameKind::CodeChange => "code_change",
143            FrameKind::SystemState => "system_state",
144            FrameKind::ErrorRecovery => "error_recovery",
145            FrameKind::Checkpoint => "checkpoint",
146            FrameKind::ToolCall => "tool_call",
147            FrameKind::Other(s) => s.as_str(),
148        }
149    }
150}
151
152impl fmt::Display for FrameKind {
153    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154        write!(f, "{}", self.as_str())
155    }
156}
157
158/// A single context frame — a compact summary captured at a point in
159/// time during the AI workflow.
160///
161/// Frames are **passive data records**. They carry no back-references
162/// to the [`PlanStep`](super::plan::PlanStep) that consumed or produced
163/// them; that association is tracked on the step side via `iframes`
164/// and `oframes`.
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct ContextFrame {
167    /// Stable monotonic identifier for this frame.
168    ///
169    /// Assigned by [`ContextPipeline::push_frame`] from a monotonic
170    /// counter (`next_frame_id`). Unlike Vec indices, frame IDs remain
171    /// stable across eviction — [`PlanStep`](super::plan::PlanStep)s
172    /// reference frames by ID via `iframes` and `oframes`.
173    frame_id: u64,
174    /// The kind of context this frame captures.
175    ///
176    /// Determines how `summary` and `data` should be interpreted.
177    /// Also affects eviction: `IntentAnalysis` and `Checkpoint`
178    /// frames are protected.
179    kind: FrameKind,
180    /// Compact human-readable summary of this frame's content.
181    ///
182    /// Should be concise (a few sentences). For example:
183    /// - IntentAnalysis: "Add pagination to GET /users with limit/offset"
184    /// - StepSummary: "Refactored auth module, 3 files changed"
185    /// - CodeChange: "Modified src/api.rs (+42 -15)"
186    summary: String,
187    /// Structured data payload for machine consumption.
188    ///
189    /// Schema depends on `kind`. For example:
190    /// - CodeChange: `{"files": ["src/api.rs"], "insertions": 42, "deletions": 15}`
191    /// - ToolCall: `{"tool": "search", "args": {...}, "result_preview": "..."}`
192    ///
193    /// `None` when the `summary` is sufficient and no structured
194    /// data is needed.
195    #[serde(default, skip_serializing_if = "Option::is_none")]
196    data: Option<serde_json::Value>,
197    /// UTC timestamp of when this frame was created.
198    ///
199    /// Automatically set to `Utc::now()` by [`ContextFrame::new`].
200    /// Frames within a pipeline are chronologically ordered.
201    created_at: DateTime<Utc>,
202    /// Estimated token count for context-window budgeting.
203    ///
204    /// Used by the orchestrator to decide how many frames fit in
205    /// the LLM's context window. `None` when the estimate hasn't
206    /// been computed. See
207    /// [`ContextPipeline::total_token_estimate`] for aggregation.
208    #[serde(default, skip_serializing_if = "Option::is_none")]
209    token_estimate: Option<u64>,
210}
211
212impl ContextFrame {
213    /// Create a new frame with the given kind and summary.
214    ///
215    /// `frame_id` is typically assigned by
216    /// [`ContextPipeline::push_frame`]; callers building frames
217    /// manually can pass any unique monotonic value.
218    pub fn new(frame_id: u64, kind: FrameKind, summary: impl Into<String>) -> Self {
219        Self {
220            frame_id,
221            kind,
222            summary: summary.into(),
223            data: None,
224            created_at: Utc::now(),
225            token_estimate: None,
226        }
227    }
228
229    /// Returns this frame's stable ID.
230    pub fn frame_id(&self) -> u64 {
231        self.frame_id
232    }
233
234    pub fn kind(&self) -> &FrameKind {
235        &self.kind
236    }
237
238    pub fn summary(&self) -> &str {
239        &self.summary
240    }
241
242    pub fn data(&self) -> Option<&serde_json::Value> {
243        self.data.as_ref()
244    }
245
246    pub fn created_at(&self) -> DateTime<Utc> {
247        self.created_at
248    }
249
250    pub fn token_estimate(&self) -> Option<u64> {
251        self.token_estimate
252    }
253
254    pub fn set_data(&mut self, data: Option<serde_json::Value>) {
255        self.data = data;
256    }
257
258    pub fn set_token_estimate(&mut self, token_estimate: Option<u64>) {
259        self.token_estimate = token_estimate;
260    }
261}
262
263/// A dynamic context pipeline that accumulates
264/// [`ContextFrame`]s throughout an AI workflow.
265///
266/// Created when an [`Intent`](super::intent::Intent)'s content is
267/// first analyzed, seeded with an
268/// [`IntentAnalysis`](FrameKind::IntentAnalysis) frame. The
269/// [`Plan`](super::plan::Plan) references this pipeline via
270/// `pipeline` as its context basis. See module documentation for
271/// lifecycle position, eviction rules, and purpose.
272#[derive(Debug, Clone, Serialize, Deserialize)]
273pub struct ContextPipeline {
274    /// Common header (object ID, type, timestamps, creator, etc.).
275    #[serde(flatten)]
276    header: Header,
277    /// Chronologically ordered context frames.
278    ///
279    /// New frames are appended via [`push_frame`](ContextPipeline::push_frame).
280    /// If `max_frames > 0` and the limit is exceeded, the oldest
281    /// evictable frame is removed (see eviction rules in module docs).
282    /// [`PlanStep`](super::plan::PlanStep)s reference frames by stable
283    /// `frame_id` via `iframes` and `oframes`. Frame IDs are monotonic
284    /// and survive eviction (indices do not).
285    #[serde(default)]
286    frames: Vec<ContextFrame>,
287    /// Monotonic counter for assigning stable [`ContextFrame::frame_id`]s.
288    ///
289    /// Incremented by [`push_frame`](ContextPipeline::push_frame) each
290    /// time a frame is added. Never decremented, even after eviction.
291    #[serde(default)]
292    next_frame_id: u64,
293    /// Maximum number of active frames before eviction kicks in.
294    ///
295    /// `0` means unlimited (no eviction). When the frame count
296    /// exceeds this limit, the oldest non-protected frame is removed.
297    /// `IntentAnalysis` and `Checkpoint` frames are protected and
298    /// never evicted.
299    #[serde(default)]
300    max_frames: u32,
301    /// Aggregated human-readable summary across all frames.
302    ///
303    /// Maintained by the orchestrator as a high-level overview of
304    /// the pipeline's accumulated context. Useful for quickly
305    /// understanding the overall progress without reading individual
306    /// frames. `None` when no summary has been set.
307    #[serde(default, skip_serializing_if = "Option::is_none")]
308    global_summary: Option<String>,
309}
310
311impl ContextPipeline {
312    /// Create a new empty pipeline.
313    ///
314    /// After creation, seed it with an [`IntentAnalysis`](FrameKind::IntentAnalysis)
315    /// frame, then create a [`Plan`](super::plan::Plan) that references this
316    /// pipeline via `pipeline`.
317    pub fn new(created_by: ActorRef) -> Result<Self, String> {
318        Ok(Self {
319            header: Header::new(ObjectType::ContextPipeline, created_by)?,
320            frames: Vec::new(),
321            next_frame_id: 0,
322            max_frames: 0,
323            global_summary: None,
324        })
325    }
326
327    pub fn header(&self) -> &Header {
328        &self.header
329    }
330
331    /// Returns all frames in chronological order.
332    pub fn frames(&self) -> &[ContextFrame] {
333        &self.frames
334    }
335
336    pub fn max_frames(&self) -> u32 {
337        self.max_frames
338    }
339
340    pub fn global_summary(&self) -> Option<&str> {
341        self.global_summary.as_deref()
342    }
343
344    pub fn set_max_frames(&mut self, max_frames: u32) {
345        self.max_frames = max_frames;
346    }
347
348    pub fn set_global_summary(&mut self, summary: Option<String>) {
349        self.global_summary = summary;
350    }
351
352    /// Append a frame, assigning it a stable `frame_id`.
353    ///
354    /// Returns the assigned `frame_id`. If `max_frames > 0` and the
355    /// limit is exceeded, the oldest evictable (non-IntentAnalysis,
356    /// non-Checkpoint) frame is removed to make room. Eviction does
357    /// not affect the IDs of surviving frames.
358    pub fn push_frame(&mut self, kind: FrameKind, summary: impl Into<String>) -> u64 {
359        let frame = ContextFrame::new(self.next_frame_id, kind, summary);
360        self.push_frame_raw(frame)
361    }
362
363    /// Append a pre-built frame, overwriting its `frame_id` with the
364    /// next monotonic ID. Returns the assigned `frame_id`.
365    ///
366    /// Use this when you need to set properties (e.g. `token_estimate`,
367    /// `data`) on the frame before pushing.
368    pub fn push_frame_raw(&mut self, mut frame: ContextFrame) -> u64 {
369        let id = self.next_frame_id;
370        self.next_frame_id += 1;
371        frame.frame_id = id;
372        self.frames.push(frame);
373        self.evict_if_needed();
374        id
375    }
376
377    /// Look up a frame by its stable `frame_id`.
378    ///
379    /// Returns `None` if the frame has been evicted or the ID is invalid.
380    pub fn frame_by_id(&self, frame_id: u64) -> Option<&ContextFrame> {
381        self.frames.iter().find(|f| f.frame_id == frame_id)
382    }
383
384    /// Returns frames that contribute to the active context window
385    /// (i.e. all current frames after any eviction has been applied).
386    pub fn active_frames(&self) -> &[ContextFrame] {
387        &self.frames
388    }
389
390    /// Total estimated tokens across all frames.
391    pub fn total_token_estimate(&self) -> u64 {
392        self.frames.iter().filter_map(|f| f.token_estimate).sum()
393    }
394
395    /// Evict the oldest evictable frame if over the limit.
396    ///
397    /// `IntentAnalysis` and `Checkpoint` frames are protected from eviction.
398    fn evict_if_needed(&mut self) {
399        if self.max_frames == 0 {
400            return;
401        }
402        while self.frames.len() > self.max_frames as usize {
403            // Find the first evictable frame (not IntentAnalysis or Checkpoint)
404            if let Some(pos) = self.frames.iter().position(|f| {
405                f.kind != FrameKind::Checkpoint && f.kind != FrameKind::IntentAnalysis
406            }) {
407                self.frames.remove(pos);
408            } else {
409                // All frames are protected — nothing to evict
410                break;
411            }
412        }
413    }
414}
415
416impl fmt::Display for ContextPipeline {
417    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418        write!(f, "ContextPipeline: {}", self.header.object_id())
419    }
420}
421
422impl ObjectTrait for ContextPipeline {
423    fn from_bytes(data: &[u8], _hash: ObjectHash) -> Result<Self, GitError>
424    where
425        Self: Sized,
426    {
427        serde_json::from_slice(data)
428            .map_err(|e| GitError::InvalidContextPipelineObject(e.to_string()))
429    }
430
431    fn get_type(&self) -> ObjectType {
432        ObjectType::ContextPipeline
433    }
434
435    fn get_size(&self) -> usize {
436        match serde_json::to_vec(self) {
437            Ok(v) => v.len(),
438            Err(e) => {
439                tracing::warn!("failed to compute ContextPipeline size: {}", e);
440                0
441            }
442        }
443    }
444
445    fn to_data(&self) -> Result<Vec<u8>, GitError> {
446        serde_json::to_vec(self).map_err(|e| GitError::InvalidContextPipelineObject(e.to_string()))
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453
454    fn make_pipeline() -> ContextPipeline {
455        let actor = ActorRef::agent("orchestrator").expect("actor");
456        ContextPipeline::new(actor).expect("pipeline")
457    }
458
459    #[test]
460    fn test_pipeline_creation() {
461        let pipeline = make_pipeline();
462
463        assert_eq!(
464            pipeline.header().object_type(),
465            &ObjectType::ContextPipeline
466        );
467        assert!(pipeline.frames().is_empty());
468        assert_eq!(pipeline.max_frames(), 0);
469        assert!(pipeline.global_summary().is_none());
470    }
471
472    #[test]
473    fn test_push_and_retrieve_frames() {
474        let mut pipeline = make_pipeline();
475
476        let mut f1 = ContextFrame::new(0, FrameKind::StepSummary, "Completed auth refactor");
477        f1.set_token_estimate(Some(200));
478        let id0 = pipeline.push_frame_raw(f1);
479
480        let id1 = pipeline.push_frame(FrameKind::CodeChange, "Modified 3 files, +120 -45 lines");
481
482        let mut f3 = ContextFrame::new(0, FrameKind::Checkpoint, "User save-point");
483        f3.set_data(Some(serde_json::json!({"key": "value"})));
484        let id2 = pipeline.push_frame_raw(f3);
485
486        assert_eq!(pipeline.frames().len(), 3);
487        assert_eq!(id0, 0);
488        assert_eq!(id1, 1);
489        assert_eq!(id2, 2);
490        assert_eq!(
491            pipeline.frame_by_id(0).unwrap().kind(),
492            &FrameKind::StepSummary
493        );
494        assert_eq!(
495            pipeline.frame_by_id(1).unwrap().kind(),
496            &FrameKind::CodeChange
497        );
498        assert_eq!(
499            pipeline.frame_by_id(2).unwrap().kind(),
500            &FrameKind::Checkpoint
501        );
502        assert!(pipeline.frame_by_id(2).unwrap().data().is_some());
503    }
504
505    #[test]
506    fn test_max_frames_eviction() {
507        let mut pipeline = make_pipeline();
508        pipeline.set_max_frames(3);
509
510        // Push a checkpoint (should survive eviction)
511        let cp_id = pipeline.push_frame(FrameKind::Checkpoint, "save-point");
512        // Push regular frames
513        let s1_id = pipeline.push_frame(FrameKind::StepSummary, "step 1");
514        pipeline.push_frame(FrameKind::StepSummary, "step 2");
515        assert_eq!(pipeline.frames().len(), 3);
516
517        // This push exceeds max_frames → oldest non-Checkpoint ("step 1") is evicted
518        pipeline.push_frame(FrameKind::CodeChange, "code change");
519        assert_eq!(pipeline.frames().len(), 3);
520
521        // Checkpoint survived, "step 1" was evicted
522        assert!(pipeline.frame_by_id(cp_id).is_some());
523        assert!(pipeline.frame_by_id(s1_id).is_none()); // evicted
524        assert_eq!(pipeline.frames()[0].kind(), &FrameKind::Checkpoint);
525        assert_eq!(pipeline.frames()[1].summary(), "step 2");
526        assert_eq!(pipeline.frames()[2].summary(), "code change");
527    }
528
529    #[test]
530    fn test_total_token_estimate() {
531        let mut pipeline = make_pipeline();
532
533        let mut f1 = ContextFrame::new(0, FrameKind::StepSummary, "s1");
534        f1.set_token_estimate(Some(100));
535        pipeline.push_frame_raw(f1);
536
537        let mut f2 = ContextFrame::new(0, FrameKind::StepSummary, "s2");
538        f2.set_token_estimate(Some(250));
539        pipeline.push_frame_raw(f2);
540
541        // Frame without token estimate
542        pipeline.push_frame(FrameKind::Checkpoint, "cp");
543
544        assert_eq!(pipeline.total_token_estimate(), 350);
545    }
546
547    #[test]
548    fn test_serialization_roundtrip() {
549        let mut pipeline = make_pipeline();
550        pipeline.set_global_summary(Some("Overall progress summary".to_string()));
551
552        let mut frame = ContextFrame::new(0, FrameKind::StepSummary, "did stuff");
553        frame.set_token_estimate(Some(150));
554        frame.set_data(Some(serde_json::json!({"files": ["a.rs", "b.rs"]})));
555        pipeline.push_frame_raw(frame);
556
557        let data = pipeline.to_data().expect("serialize");
558        let restored =
559            ContextPipeline::from_bytes(&data, ObjectHash::default()).expect("deserialize");
560
561        assert_eq!(restored.frames().len(), 1);
562        assert_eq!(restored.frames()[0].frame_id(), 0);
563        assert_eq!(restored.frames()[0].summary(), "did stuff");
564        assert_eq!(restored.frames()[0].token_estimate(), Some(150));
565        assert_eq!(restored.global_summary(), Some("Overall progress summary"));
566    }
567
568    #[test]
569    fn test_intent_analysis_frame_survives_eviction() {
570        let mut pipeline = make_pipeline();
571        pipeline.set_max_frames(2);
572
573        // Seed with IntentAnalysis (protected)
574        let ia_id = pipeline.push_frame(FrameKind::IntentAnalysis, "AI analysis of user intent");
575        let s1_id = pipeline.push_frame(FrameKind::StepSummary, "step 1");
576        assert_eq!(pipeline.frames().len(), 2);
577
578        // Adding another frame should evict "step 1", not IntentAnalysis
579        pipeline.push_frame(FrameKind::CodeChange, "code change");
580        assert_eq!(pipeline.frames().len(), 2);
581        assert!(pipeline.frame_by_id(ia_id).is_some());
582        assert!(pipeline.frame_by_id(s1_id).is_none()); // evicted
583        assert_eq!(pipeline.frames()[0].kind(), &FrameKind::IntentAnalysis);
584        assert_eq!(pipeline.frames()[1].summary(), "code change");
585    }
586}