git_internal/internal/object/pipeline.rs
1//! Dynamic Context Pipeline
2//!
3//! A [`ContextPipeline`] solves the context-forgetting problem in
4//! long-running AI tasks. Instead of relying solely on a static
5//! [`ContextSnapshot`](super::context::ContextSnapshot) captured at
6//! Run start, a ContextPipeline accumulates incremental
7//! [`ContextFrame`]s throughout the workflow.
8//!
9//! # Position in Lifecycle
10//!
11//! ```text
12//! ② Intent (Active) ← content analyzed
13//! │
14//! ▼
15//! ContextPipeline created ← seeded with IntentAnalysis frame
16//! │
17//! ▼
18//! ③ Plan (Plan.pipeline → Pipeline, Plan.fwindow = visible range)
19//! │ steps execute
20//! ▼
21//! Frames accumulate ← StepSummary, CodeChange, ToolCall, ...
22//! │
23//! ▼
24//! Replan? → new Plan with updated fwindow
25//! ```
26//!
27//! The pipeline is created *after* an Intent's content is analyzed
28//! (step ②) but *before* a Plan exists. The initial
29//! [`IntentAnalysis`](FrameKind::IntentAnalysis) frame captures the
30//! AI's structured interpretation, which serves as the foundation
31//! for Plan creation. The [`Plan`](super::plan::Plan) then references
32//! this pipeline via `pipeline` and records the visible frame range
33//! via `fwindow`. During execution, frames accumulate to track
34//! step-by-step progress.
35//!
36//! # Relationship to Other Objects
37//!
38//! ```text
39//! Intent ──plan──→ Plan ──pipeline──→ ContextPipeline
40//! │ │
41//! [PlanStep₀, ...] [IntentAnalysis, StepSummary, ...]
42//! │ ▲
43//! iframes/oframes ──────────────┘
44//! ```
45//!
46//! | From | Field | To | Notes |
47//! |------|-------|----|-------|
48//! | Plan | `pipeline` | ContextPipeline | 0..1 |
49//! | PlanStep | `iframes` | ContextFrame IDs | consumed context |
50//! | PlanStep | `oframes` | ContextFrame IDs | produced context |
51//!
52//! The pipeline itself has no back-references — it is a passive
53//! container. [`PlanStep`](super::plan::PlanStep)s own the
54//! association via `iframes` and `oframes`.
55//!
56//! # Eviction
57//!
58//! When `max_frames > 0` and the limit is exceeded, the oldest
59//! evictable frame is removed. `IntentAnalysis` and `Checkpoint`
60//! frames are **protected** from eviction — they always survive.
61//!
62//! # Purpose
63//!
64//! - **Context Continuity**: Maintains a rolling window of high-value
65//! context for the agent's working memory across Plan steps.
66//! - **Incremental Updates**: Unlike the static ContextSnapshot, the
67//! pipeline grows as work progresses, capturing step summaries,
68//! code changes, and tool results.
69//! - **Bounded Memory**: `max_frames` + eviction ensures the pipeline
70//! doesn't grow unboundedly in long-running workflows.
71//! - **Replan Support**: When replanning occurs, a new Plan can
72//! reference the same pipeline with an updated `fwindow` that
73//! includes frames accumulated since the previous plan.
74
75use std::fmt;
76
77use chrono::{DateTime, Utc};
78use serde::{Deserialize, Serialize};
79
80use crate::{
81 errors::GitError,
82 hash::ObjectHash,
83 internal::object::{
84 ObjectTrait,
85 types::{ActorRef, Header, ObjectType},
86 },
87};
88
89/// The kind of context captured in a [`ContextFrame`].
90///
91/// Determines how the frame's `summary` and `data` should be
92/// interpreted. `IntentAnalysis` and `Checkpoint` are protected
93/// from eviction when `max_frames` is exceeded.
94#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
95#[serde(rename_all = "snake_case")]
96pub enum FrameKind {
97 /// Initial context derived from an Intent's analyzed content.
98 ///
99 /// Created when the AI fills in the `content` field on an Intent,
100 /// serving as the foundation for subsequent Plan creation. This
101 /// is the **seed frame** — always the first frame in a pipeline.
102 /// **Protected from eviction.**
103 IntentAnalysis,
104 /// Summary produced after a [`PlanStep`](super::plan::PlanStep)
105 /// completes. Captures what the step accomplished so that
106 /// subsequent steps have context.
107 StepSummary,
108 /// Code change digest (e.g. files modified, diff stats).
109 /// Typically produced alongside a
110 /// [`PatchSet`](super::patchset::PatchSet).
111 CodeChange,
112 /// System or environment state snapshot (e.g. memory usage,
113 /// disk space, running services).
114 SystemState,
115 /// Context captured during error recovery. Records what went
116 /// wrong and what corrective action was taken, so that subsequent
117 /// steps don't repeat the same mistakes.
118 ErrorRecovery,
119 /// Explicit save-point created by user or system.
120 /// **Protected from eviction.** Used for long-running workflows
121 /// where the agent may be paused and resumed.
122 Checkpoint,
123 /// Result of an external tool invocation (MCP service, function
124 /// call, REST API, CLI command, etc.).
125 ///
126 /// Intentionally protocol-agnostic: MCP is one transport for
127 /// tool calls, but agents may also invoke tools via direct
128 /// function calls, HTTP APIs, or shell commands. Protocol-specific
129 /// details (server name, tool name, arguments, result preview)
130 /// belong in `ContextFrame.data`.
131 ToolCall,
132 /// Application-defined context type not covered by the variants
133 /// above.
134 Other(String),
135}
136
137impl FrameKind {
138 pub fn as_str(&self) -> &str {
139 match self {
140 FrameKind::IntentAnalysis => "intent_analysis",
141 FrameKind::StepSummary => "step_summary",
142 FrameKind::CodeChange => "code_change",
143 FrameKind::SystemState => "system_state",
144 FrameKind::ErrorRecovery => "error_recovery",
145 FrameKind::Checkpoint => "checkpoint",
146 FrameKind::ToolCall => "tool_call",
147 FrameKind::Other(s) => s.as_str(),
148 }
149 }
150}
151
152impl fmt::Display for FrameKind {
153 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
154 write!(f, "{}", self.as_str())
155 }
156}
157
158/// A single context frame — a compact summary captured at a point in
159/// time during the AI workflow.
160///
161/// Frames are **passive data records**. They carry no back-references
162/// to the [`PlanStep`](super::plan::PlanStep) that consumed or produced
163/// them; that association is tracked on the step side via `iframes`
164/// and `oframes`.
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct ContextFrame {
167 /// Stable monotonic identifier for this frame.
168 ///
169 /// Assigned by [`ContextPipeline::push_frame`] from a monotonic
170 /// counter (`next_frame_id`). Unlike Vec indices, frame IDs remain
171 /// stable across eviction — [`PlanStep`](super::plan::PlanStep)s
172 /// reference frames by ID via `iframes` and `oframes`.
173 frame_id: u64,
174 /// The kind of context this frame captures.
175 ///
176 /// Determines how `summary` and `data` should be interpreted.
177 /// Also affects eviction: `IntentAnalysis` and `Checkpoint`
178 /// frames are protected.
179 kind: FrameKind,
180 /// Compact human-readable summary of this frame's content.
181 ///
182 /// Should be concise (a few sentences). For example:
183 /// - IntentAnalysis: "Add pagination to GET /users with limit/offset"
184 /// - StepSummary: "Refactored auth module, 3 files changed"
185 /// - CodeChange: "Modified src/api.rs (+42 -15)"
186 summary: String,
187 /// Structured data payload for machine consumption.
188 ///
189 /// Schema depends on `kind`. For example:
190 /// - CodeChange: `{"files": ["src/api.rs"], "insertions": 42, "deletions": 15}`
191 /// - ToolCall: `{"tool": "search", "args": {...}, "result_preview": "..."}`
192 ///
193 /// `None` when the `summary` is sufficient and no structured
194 /// data is needed.
195 #[serde(default, skip_serializing_if = "Option::is_none")]
196 data: Option<serde_json::Value>,
197 /// UTC timestamp of when this frame was created.
198 ///
199 /// Automatically set to `Utc::now()` by [`ContextFrame::new`].
200 /// Frames within a pipeline are chronologically ordered.
201 created_at: DateTime<Utc>,
202 /// Estimated token count for context-window budgeting.
203 ///
204 /// Used by the orchestrator to decide how many frames fit in
205 /// the LLM's context window. `None` when the estimate hasn't
206 /// been computed. See
207 /// [`ContextPipeline::total_token_estimate`] for aggregation.
208 #[serde(default, skip_serializing_if = "Option::is_none")]
209 token_estimate: Option<u64>,
210}
211
212impl ContextFrame {
213 /// Create a new frame with the given kind and summary.
214 ///
215 /// `frame_id` is typically assigned by
216 /// [`ContextPipeline::push_frame`]; callers building frames
217 /// manually can pass any unique monotonic value.
218 pub fn new(frame_id: u64, kind: FrameKind, summary: impl Into<String>) -> Self {
219 Self {
220 frame_id,
221 kind,
222 summary: summary.into(),
223 data: None,
224 created_at: Utc::now(),
225 token_estimate: None,
226 }
227 }
228
229 /// Returns this frame's stable ID.
230 pub fn frame_id(&self) -> u64 {
231 self.frame_id
232 }
233
234 pub fn kind(&self) -> &FrameKind {
235 &self.kind
236 }
237
238 pub fn summary(&self) -> &str {
239 &self.summary
240 }
241
242 pub fn data(&self) -> Option<&serde_json::Value> {
243 self.data.as_ref()
244 }
245
246 pub fn created_at(&self) -> DateTime<Utc> {
247 self.created_at
248 }
249
250 pub fn token_estimate(&self) -> Option<u64> {
251 self.token_estimate
252 }
253
254 pub fn set_data(&mut self, data: Option<serde_json::Value>) {
255 self.data = data;
256 }
257
258 pub fn set_token_estimate(&mut self, token_estimate: Option<u64>) {
259 self.token_estimate = token_estimate;
260 }
261}
262
263/// A dynamic context pipeline that accumulates
264/// [`ContextFrame`]s throughout an AI workflow.
265///
266/// Created when an [`Intent`](super::intent::Intent)'s content is
267/// first analyzed, seeded with an
268/// [`IntentAnalysis`](FrameKind::IntentAnalysis) frame. The
269/// [`Plan`](super::plan::Plan) references this pipeline via
270/// `pipeline` as its context basis. See module documentation for
271/// lifecycle position, eviction rules, and purpose.
272#[derive(Debug, Clone, Serialize, Deserialize)]
273pub struct ContextPipeline {
274 /// Common header (object ID, type, timestamps, creator, etc.).
275 #[serde(flatten)]
276 header: Header,
277 /// Chronologically ordered context frames.
278 ///
279 /// New frames are appended via [`push_frame`](ContextPipeline::push_frame).
280 /// If `max_frames > 0` and the limit is exceeded, the oldest
281 /// evictable frame is removed (see eviction rules in module docs).
282 /// [`PlanStep`](super::plan::PlanStep)s reference frames by stable
283 /// `frame_id` via `iframes` and `oframes`. Frame IDs are monotonic
284 /// and survive eviction (indices do not).
285 #[serde(default)]
286 frames: Vec<ContextFrame>,
287 /// Monotonic counter for assigning stable [`ContextFrame::frame_id`]s.
288 ///
289 /// Incremented by [`push_frame`](ContextPipeline::push_frame) each
290 /// time a frame is added. Never decremented, even after eviction.
291 #[serde(default)]
292 next_frame_id: u64,
293 /// Maximum number of active frames before eviction kicks in.
294 ///
295 /// `0` means unlimited (no eviction). When the frame count
296 /// exceeds this limit, the oldest non-protected frame is removed.
297 /// `IntentAnalysis` and `Checkpoint` frames are protected and
298 /// never evicted.
299 #[serde(default)]
300 max_frames: u32,
301 /// Aggregated human-readable summary across all frames.
302 ///
303 /// Maintained by the orchestrator as a high-level overview of
304 /// the pipeline's accumulated context. Useful for quickly
305 /// understanding the overall progress without reading individual
306 /// frames. `None` when no summary has been set.
307 #[serde(default, skip_serializing_if = "Option::is_none")]
308 global_summary: Option<String>,
309}
310
311impl ContextPipeline {
312 /// Create a new empty pipeline.
313 ///
314 /// After creation, seed it with an [`IntentAnalysis`](FrameKind::IntentAnalysis)
315 /// frame, then create a [`Plan`](super::plan::Plan) that references this
316 /// pipeline via `pipeline`.
317 pub fn new(created_by: ActorRef) -> Result<Self, String> {
318 Ok(Self {
319 header: Header::new(ObjectType::ContextPipeline, created_by)?,
320 frames: Vec::new(),
321 next_frame_id: 0,
322 max_frames: 0,
323 global_summary: None,
324 })
325 }
326
327 pub fn header(&self) -> &Header {
328 &self.header
329 }
330
331 /// Returns all frames in chronological order.
332 pub fn frames(&self) -> &[ContextFrame] {
333 &self.frames
334 }
335
336 pub fn max_frames(&self) -> u32 {
337 self.max_frames
338 }
339
340 pub fn global_summary(&self) -> Option<&str> {
341 self.global_summary.as_deref()
342 }
343
344 pub fn set_max_frames(&mut self, max_frames: u32) {
345 self.max_frames = max_frames;
346 }
347
348 pub fn set_global_summary(&mut self, summary: Option<String>) {
349 self.global_summary = summary;
350 }
351
352 /// Append a frame, assigning it a stable `frame_id`.
353 ///
354 /// Returns the assigned `frame_id`. If `max_frames > 0` and the
355 /// limit is exceeded, the oldest evictable (non-IntentAnalysis,
356 /// non-Checkpoint) frame is removed to make room. Eviction does
357 /// not affect the IDs of surviving frames.
358 pub fn push_frame(&mut self, kind: FrameKind, summary: impl Into<String>) -> u64 {
359 let frame = ContextFrame::new(self.next_frame_id, kind, summary);
360 self.push_frame_raw(frame)
361 }
362
363 /// Append a pre-built frame, overwriting its `frame_id` with the
364 /// next monotonic ID. Returns the assigned `frame_id`.
365 ///
366 /// Use this when you need to set properties (e.g. `token_estimate`,
367 /// `data`) on the frame before pushing.
368 pub fn push_frame_raw(&mut self, mut frame: ContextFrame) -> u64 {
369 let id = self.next_frame_id;
370 self.next_frame_id += 1;
371 frame.frame_id = id;
372 self.frames.push(frame);
373 self.evict_if_needed();
374 id
375 }
376
377 /// Look up a frame by its stable `frame_id`.
378 ///
379 /// Returns `None` if the frame has been evicted or the ID is invalid.
380 pub fn frame_by_id(&self, frame_id: u64) -> Option<&ContextFrame> {
381 self.frames.iter().find(|f| f.frame_id == frame_id)
382 }
383
384 /// Returns frames that contribute to the active context window
385 /// (i.e. all current frames after any eviction has been applied).
386 pub fn active_frames(&self) -> &[ContextFrame] {
387 &self.frames
388 }
389
390 /// Total estimated tokens across all frames.
391 pub fn total_token_estimate(&self) -> u64 {
392 self.frames.iter().filter_map(|f| f.token_estimate).sum()
393 }
394
395 /// Evict the oldest evictable frame if over the limit.
396 ///
397 /// `IntentAnalysis` and `Checkpoint` frames are protected from eviction.
398 fn evict_if_needed(&mut self) {
399 if self.max_frames == 0 {
400 return;
401 }
402 while self.frames.len() > self.max_frames as usize {
403 // Find the first evictable frame (not IntentAnalysis or Checkpoint)
404 if let Some(pos) = self.frames.iter().position(|f| {
405 f.kind != FrameKind::Checkpoint && f.kind != FrameKind::IntentAnalysis
406 }) {
407 self.frames.remove(pos);
408 } else {
409 // All frames are protected — nothing to evict
410 break;
411 }
412 }
413 }
414}
415
416impl fmt::Display for ContextPipeline {
417 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418 write!(f, "ContextPipeline: {}", self.header.object_id())
419 }
420}
421
422impl ObjectTrait for ContextPipeline {
423 fn from_bytes(data: &[u8], _hash: ObjectHash) -> Result<Self, GitError>
424 where
425 Self: Sized,
426 {
427 serde_json::from_slice(data)
428 .map_err(|e| GitError::InvalidContextPipelineObject(e.to_string()))
429 }
430
431 fn get_type(&self) -> ObjectType {
432 ObjectType::ContextPipeline
433 }
434
435 fn get_size(&self) -> usize {
436 match serde_json::to_vec(self) {
437 Ok(v) => v.len(),
438 Err(e) => {
439 tracing::warn!("failed to compute ContextPipeline size: {}", e);
440 0
441 }
442 }
443 }
444
445 fn to_data(&self) -> Result<Vec<u8>, GitError> {
446 serde_json::to_vec(self).map_err(|e| GitError::InvalidContextPipelineObject(e.to_string()))
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453
454 fn make_pipeline() -> ContextPipeline {
455 let actor = ActorRef::agent("orchestrator").expect("actor");
456 ContextPipeline::new(actor).expect("pipeline")
457 }
458
459 #[test]
460 fn test_pipeline_creation() {
461 let pipeline = make_pipeline();
462
463 assert_eq!(
464 pipeline.header().object_type(),
465 &ObjectType::ContextPipeline
466 );
467 assert!(pipeline.frames().is_empty());
468 assert_eq!(pipeline.max_frames(), 0);
469 assert!(pipeline.global_summary().is_none());
470 }
471
472 #[test]
473 fn test_push_and_retrieve_frames() {
474 let mut pipeline = make_pipeline();
475
476 let mut f1 = ContextFrame::new(0, FrameKind::StepSummary, "Completed auth refactor");
477 f1.set_token_estimate(Some(200));
478 let id0 = pipeline.push_frame_raw(f1);
479
480 let id1 = pipeline.push_frame(FrameKind::CodeChange, "Modified 3 files, +120 -45 lines");
481
482 let mut f3 = ContextFrame::new(0, FrameKind::Checkpoint, "User save-point");
483 f3.set_data(Some(serde_json::json!({"key": "value"})));
484 let id2 = pipeline.push_frame_raw(f3);
485
486 assert_eq!(pipeline.frames().len(), 3);
487 assert_eq!(id0, 0);
488 assert_eq!(id1, 1);
489 assert_eq!(id2, 2);
490 assert_eq!(
491 pipeline.frame_by_id(0).unwrap().kind(),
492 &FrameKind::StepSummary
493 );
494 assert_eq!(
495 pipeline.frame_by_id(1).unwrap().kind(),
496 &FrameKind::CodeChange
497 );
498 assert_eq!(
499 pipeline.frame_by_id(2).unwrap().kind(),
500 &FrameKind::Checkpoint
501 );
502 assert!(pipeline.frame_by_id(2).unwrap().data().is_some());
503 }
504
505 #[test]
506 fn test_max_frames_eviction() {
507 let mut pipeline = make_pipeline();
508 pipeline.set_max_frames(3);
509
510 // Push a checkpoint (should survive eviction)
511 let cp_id = pipeline.push_frame(FrameKind::Checkpoint, "save-point");
512 // Push regular frames
513 let s1_id = pipeline.push_frame(FrameKind::StepSummary, "step 1");
514 pipeline.push_frame(FrameKind::StepSummary, "step 2");
515 assert_eq!(pipeline.frames().len(), 3);
516
517 // This push exceeds max_frames → oldest non-Checkpoint ("step 1") is evicted
518 pipeline.push_frame(FrameKind::CodeChange, "code change");
519 assert_eq!(pipeline.frames().len(), 3);
520
521 // Checkpoint survived, "step 1" was evicted
522 assert!(pipeline.frame_by_id(cp_id).is_some());
523 assert!(pipeline.frame_by_id(s1_id).is_none()); // evicted
524 assert_eq!(pipeline.frames()[0].kind(), &FrameKind::Checkpoint);
525 assert_eq!(pipeline.frames()[1].summary(), "step 2");
526 assert_eq!(pipeline.frames()[2].summary(), "code change");
527 }
528
529 #[test]
530 fn test_total_token_estimate() {
531 let mut pipeline = make_pipeline();
532
533 let mut f1 = ContextFrame::new(0, FrameKind::StepSummary, "s1");
534 f1.set_token_estimate(Some(100));
535 pipeline.push_frame_raw(f1);
536
537 let mut f2 = ContextFrame::new(0, FrameKind::StepSummary, "s2");
538 f2.set_token_estimate(Some(250));
539 pipeline.push_frame_raw(f2);
540
541 // Frame without token estimate
542 pipeline.push_frame(FrameKind::Checkpoint, "cp");
543
544 assert_eq!(pipeline.total_token_estimate(), 350);
545 }
546
547 #[test]
548 fn test_serialization_roundtrip() {
549 let mut pipeline = make_pipeline();
550 pipeline.set_global_summary(Some("Overall progress summary".to_string()));
551
552 let mut frame = ContextFrame::new(0, FrameKind::StepSummary, "did stuff");
553 frame.set_token_estimate(Some(150));
554 frame.set_data(Some(serde_json::json!({"files": ["a.rs", "b.rs"]})));
555 pipeline.push_frame_raw(frame);
556
557 let data = pipeline.to_data().expect("serialize");
558 let restored =
559 ContextPipeline::from_bytes(&data, ObjectHash::default()).expect("deserialize");
560
561 assert_eq!(restored.frames().len(), 1);
562 assert_eq!(restored.frames()[0].frame_id(), 0);
563 assert_eq!(restored.frames()[0].summary(), "did stuff");
564 assert_eq!(restored.frames()[0].token_estimate(), Some(150));
565 assert_eq!(restored.global_summary(), Some("Overall progress summary"));
566 }
567
568 #[test]
569 fn test_intent_analysis_frame_survives_eviction() {
570 let mut pipeline = make_pipeline();
571 pipeline.set_max_frames(2);
572
573 // Seed with IntentAnalysis (protected)
574 let ia_id = pipeline.push_frame(FrameKind::IntentAnalysis, "AI analysis of user intent");
575 let s1_id = pipeline.push_frame(FrameKind::StepSummary, "step 1");
576 assert_eq!(pipeline.frames().len(), 2);
577
578 // Adding another frame should evict "step 1", not IntentAnalysis
579 pipeline.push_frame(FrameKind::CodeChange, "code change");
580 assert_eq!(pipeline.frames().len(), 2);
581 assert!(pipeline.frame_by_id(ia_id).is_some());
582 assert!(pipeline.frame_by_id(s1_id).is_none()); // evicted
583 assert_eq!(pipeline.frames()[0].kind(), &FrameKind::IntentAnalysis);
584 assert_eq!(pipeline.frames()[1].summary(), "code change");
585 }
586}