Skip to main content

git_internal/internal/object/
plan_step_event.rs

1//! Plan-step execution event.
2//!
3//! `PlanStepEvent` is the runtime bridge between immutable planning
4//! structure and actual execution.
5//!
6//! # How to use this object
7//!
8//! - Append a new event whenever a logical plan step changes execution
9//!   state inside a run.
10//! - Use `consumed_frames` and `produced_frames` to document context
11//!   flow.
12//! - Set `spawned_task_id` when the step delegates work to a durable
13//!   `Task`.
14//! - Set `outputs` when the step produced structured runtime output.
15//!
16//! # How it works with other objects
17//!
18//! - `plan_id` points to the immutable `Plan` revision.
19//! - `step_id` points to the stable `PlanStep.step_id`.
20//! - `run_id` ties the execution fact to the specific attempt.
21//! - `ContextFrame` IDs describe what context the step consumed and
22//!   produced.
23//!
24//! # How Libra should call it
25//!
26//! Libra should reconstruct current step state from the ordered
27//! `PlanStepEvent` stream rather than mutating `PlanStep` inside the
28//! stored plan snapshot.
29
30use std::fmt;
31
32use serde::{Deserialize, Serialize};
33use uuid::Uuid;
34
35use crate::{
36    errors::GitError,
37    hash::ObjectHash,
38    internal::object::{
39        ObjectTrait,
40        types::{ActorRef, Header, ObjectType},
41    },
42};
43
44#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
45#[serde(rename_all = "snake_case")]
46pub enum PlanStepStatus {
47    Pending,
48    Progressing,
49    Completed,
50    Failed,
51    Skipped,
52}
53
54impl PlanStepStatus {
55    /// Return the canonical snake_case storage/display form for the step
56    /// status.
57    pub fn as_str(&self) -> &'static str {
58        match self {
59            PlanStepStatus::Pending => "pending",
60            PlanStepStatus::Progressing => "progressing",
61            PlanStepStatus::Completed => "completed",
62            PlanStepStatus::Failed => "failed",
63            PlanStepStatus::Skipped => "skipped",
64        }
65    }
66}
67
68impl fmt::Display for PlanStepStatus {
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        write!(f, "{}", self.as_str())
71    }
72}
73
74/// Append-only execution fact for one logical plan step in one `Run`.
75///
76/// The pair `(plan_id, step_id)` identifies the logical step revision,
77/// while `run_id` identifies the execution attempt that produced this
78/// event.
79#[derive(Debug, Clone, Serialize, Deserialize)]
80#[serde(deny_unknown_fields)]
81pub struct PlanStepEvent {
82    /// Common object header carrying the immutable object id, type,
83    /// creator, and timestamps.
84    #[serde(flatten)]
85    header: Header,
86    /// Immutable plan revision that owns the referenced logical step.
87    plan_id: Uuid,
88    /// Stable logical step id inside the owning plan family.
89    step_id: Uuid,
90    /// Concrete execution attempt that produced this step event.
91    run_id: Uuid,
92    /// Runtime status recorded for the step at this point in the run.
93    status: PlanStepStatus,
94    /// Optional human-readable explanation for this status transition.
95    #[serde(default, skip_serializing_if = "Option::is_none")]
96    reason: Option<String>,
97    /// Context frame ids consumed while executing the step.
98    #[serde(default, skip_serializing_if = "Vec::is_empty")]
99    consumed_frames: Vec<Uuid>,
100    /// Context frame ids produced while executing the step.
101    #[serde(default, skip_serializing_if = "Vec::is_empty")]
102    produced_frames: Vec<Uuid>,
103    /// Optional durable task spawned from this step.
104    #[serde(default, skip_serializing_if = "Option::is_none")]
105    spawned_task_id: Option<Uuid>,
106    /// Optional structured runtime outputs produced by the step.
107    #[serde(default, skip_serializing_if = "Option::is_none")]
108    outputs: Option<serde_json::Value>,
109}
110
111impl PlanStepEvent {
112    /// Create a new execution event for one logical plan step inside one
113    /// run.
114    pub fn new(
115        created_by: ActorRef,
116        plan_id: Uuid,
117        step_id: Uuid,
118        run_id: Uuid,
119        status: PlanStepStatus,
120    ) -> Result<Self, String> {
121        Ok(Self {
122            header: Header::new(ObjectType::PlanStepEvent, created_by)?,
123            plan_id,
124            step_id,
125            run_id,
126            status,
127            reason: None,
128            consumed_frames: Vec::new(),
129            produced_frames: Vec::new(),
130            spawned_task_id: None,
131            outputs: None,
132        })
133    }
134
135    /// Return the immutable header for this event.
136    pub fn header(&self) -> &Header {
137        &self.header
138    }
139
140    /// Return the owning plan revision id.
141    pub fn plan_id(&self) -> Uuid {
142        self.plan_id
143    }
144
145    /// Return the stable logical step id.
146    pub fn step_id(&self) -> Uuid {
147        self.step_id
148    }
149
150    /// Return the concrete execution attempt id.
151    pub fn run_id(&self) -> Uuid {
152        self.run_id
153    }
154
155    /// Return the recorded runtime status.
156    pub fn status(&self) -> &PlanStepStatus {
157        &self.status
158    }
159
160    /// Return the human-readable explanation, if present.
161    pub fn reason(&self) -> Option<&str> {
162        self.reason.as_deref()
163    }
164
165    /// Return the context frame ids consumed by the step.
166    pub fn consumed_frames(&self) -> &[Uuid] {
167        &self.consumed_frames
168    }
169
170    /// Return the context frame ids produced by the step.
171    pub fn produced_frames(&self) -> &[Uuid] {
172        &self.produced_frames
173    }
174
175    /// Return the durable spawned task id, if present.
176    pub fn spawned_task_id(&self) -> Option<Uuid> {
177        self.spawned_task_id
178    }
179
180    /// Return the structured runtime outputs, if present.
181    pub fn outputs(&self) -> Option<&serde_json::Value> {
182        self.outputs.as_ref()
183    }
184
185    /// Set or clear the human-readable explanation.
186    pub fn set_reason(&mut self, reason: Option<String>) {
187        self.reason = reason;
188    }
189
190    /// Replace the consumed context frame set.
191    pub fn set_consumed_frames(&mut self, consumed_frames: Vec<Uuid>) {
192        self.consumed_frames = consumed_frames;
193    }
194
195    /// Replace the produced context frame set.
196    pub fn set_produced_frames(&mut self, produced_frames: Vec<Uuid>) {
197        self.produced_frames = produced_frames;
198    }
199
200    /// Set or clear the durable spawned task id.
201    pub fn set_spawned_task_id(&mut self, spawned_task_id: Option<Uuid>) {
202        self.spawned_task_id = spawned_task_id;
203    }
204
205    /// Set or clear the structured runtime outputs.
206    pub fn set_outputs(&mut self, outputs: Option<serde_json::Value>) {
207        self.outputs = outputs;
208    }
209}
210
211impl fmt::Display for PlanStepEvent {
212    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213        write!(f, "PlanStepEvent: {}", self.header.object_id())
214    }
215}
216
217impl ObjectTrait for PlanStepEvent {
218    fn from_bytes(data: &[u8], _hash: ObjectHash) -> Result<Self, GitError>
219    where
220        Self: Sized,
221    {
222        serde_json::from_slice(data).map_err(|e| GitError::InvalidObjectInfo(e.to_string()))
223    }
224
225    fn get_type(&self) -> ObjectType {
226        ObjectType::PlanStepEvent
227    }
228
229    fn get_size(&self) -> usize {
230        match serde_json::to_vec(self) {
231            Ok(v) => v.len(),
232            Err(e) => {
233                tracing::warn!("failed to compute PlanStepEvent size: {}", e);
234                0
235            }
236        }
237    }
238
239    fn to_data(&self) -> Result<Vec<u8>, GitError> {
240        serde_json::to_vec(self).map_err(|e| GitError::InvalidObjectInfo(e.to_string()))
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247
248    // Coverage:
249    // - completed step-event creation
250    // - consumed/produced context frame flow
251    // - spawned task linkage and structured outputs
252
253    #[test]
254    fn test_plan_step_event_fields() {
255        let actor = ActorRef::agent("planner").expect("actor");
256        let mut event = PlanStepEvent::new(
257            actor,
258            Uuid::from_u128(0x1),
259            Uuid::from_u128(0x2),
260            Uuid::from_u128(0x3),
261            PlanStepStatus::Completed,
262        )
263        .expect("event");
264        let frame_a = Uuid::from_u128(0x10);
265        let frame_b = Uuid::from_u128(0x11);
266        let task_id = Uuid::from_u128(0x20);
267
268        event.set_reason(Some("done".to_string()));
269        event.set_consumed_frames(vec![frame_a]);
270        event.set_produced_frames(vec![frame_b]);
271        event.set_spawned_task_id(Some(task_id));
272        event.set_outputs(Some(serde_json::json!({"files": ["src/lib.rs"]})));
273
274        assert_eq!(event.status(), &PlanStepStatus::Completed);
275        assert_eq!(event.reason(), Some("done"));
276        assert_eq!(event.consumed_frames(), &[frame_a]);
277        assert_eq!(event.produced_frames(), &[frame_b]);
278        assert_eq!(event.spawned_task_id(), Some(task_id));
279    }
280}