Skip to main content

ainl_trajectory/
lib.rs

1//! Execution trajectory helpers for the self-learning stack.
2//!
3//! Hosts (`openfang-runtime`, `ainl-runtime`, MCP tooling) share [`TrajectoryDraft`] and
4//! [`replay::TrajectoryReplayLine`] JSONL for exports; persistence lives in `ainl-memory`.
5
6#![forbid(unsafe_code)]
7
8pub mod replay;
9
10pub use ainl_contracts::{TrajectoryOutcome, TrajectoryStep};
11pub use replay::{
12    parse_jsonl, trajectory_replay_line, TrajectoryReplayLine, TRAJECTORY_REPLAY_SCHEMA_VERSION,
13};
14
15use std::collections::{BTreeMap, BTreeSet};
16
17use ainl_contracts::{
18    ContextFreshness, ExperienceBundle, ExperienceEvent, ImpactDecision, LEARNER_SCHEMA_VERSION,
19};
20use serde::{Deserialize, Serialize};
21use uuid::Uuid;
22
23/// In-memory trajectory being assembled before commit to `ainl_memory` as [`ainl_memory::TrajectoryNode`].
24#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
25pub struct TrajectoryDraft {
26    pub episode_id: Uuid,
27    pub session_id: String,
28    pub project_id: Option<String>,
29    pub ainl_source_hash: Option<String>,
30    pub outcome: TrajectoryOutcome,
31    pub steps: Vec<TrajectoryStep>,
32    pub duration_ms: u64,
33    #[serde(default, skip_serializing_if = "Option::is_none")]
34    pub frame_vars: Option<serde_json::Value>,
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub fitness_delta: Option<f32>,
37}
38
39impl TrajectoryDraft {
40    #[must_use]
41    pub fn new(episode_id: Uuid, outcome: TrajectoryOutcome) -> Self {
42        Self {
43            episode_id,
44            session_id: String::new(),
45            project_id: None,
46            ainl_source_hash: None,
47            outcome,
48            steps: Vec::new(),
49            duration_ms: 0,
50            frame_vars: None,
51            fitness_delta: None,
52        }
53    }
54
55    pub fn push_step(&mut self, step: TrajectoryStep) {
56        self.steps.push(step);
57    }
58}
59
60#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
61pub struct ExperienceCluster {
62    pub fingerprint: String,
63    pub intent: String,
64    pub outcome: TrajectoryOutcome,
65    pub trajectories: Vec<TrajectoryDraft>,
66    #[serde(default)]
67    pub success_ratio: f32,
68    #[serde(default)]
69    pub avg_duration_ms: u64,
70    #[serde(default)]
71    pub avg_vitals_trust: Option<f32>,
72}
73
74#[derive(Debug, Clone)]
75pub struct ClusterPolicy {
76    pub min_success_ratio: f32,
77    pub min_steps: usize,
78    pub max_avg_duration_ms: Option<u64>,
79    pub min_avg_vitals_trust: Option<f32>,
80}
81
82impl Default for ClusterPolicy {
83    fn default() -> Self {
84        Self {
85            min_success_ratio: 1.0,
86            min_steps: 1,
87            max_avg_duration_ms: Some(10 * 60 * 1000),
88            min_avg_vitals_trust: Some(0.55),
89        }
90    }
91}
92
93#[must_use]
94pub fn trajectory_fingerprint(record: &TrajectoryDraft) -> String {
95    let mut s = String::new();
96    s.push_str(record.project_id.as_deref().unwrap_or("global"));
97    s.push(':');
98    if let Some(source_hash) = record.ainl_source_hash.as_deref() {
99        s.push_str(source_hash);
100    }
101    for step in &record.steps {
102        s.push('|');
103        s.push_str(&step.adapter);
104        s.push('.');
105        s.push_str(&step.operation);
106    }
107    Uuid::new_v5(&Uuid::NAMESPACE_OID, s.as_bytes()).to_string()
108}
109
110#[must_use]
111pub fn cluster_experiences(records: &[TrajectoryDraft]) -> Vec<ExperienceCluster> {
112    cluster_experiences_with_policy(records, &ClusterPolicy::default())
113}
114
115#[must_use]
116pub fn cluster_experiences_with_policy(
117    records: &[TrajectoryDraft],
118    policy: &ClusterPolicy,
119) -> Vec<ExperienceCluster> {
120    let mut clusters: BTreeMap<String, ExperienceCluster> = BTreeMap::new();
121    for record in records {
122        if record.steps.len() < policy.min_steps {
123            continue;
124        }
125        let fingerprint = trajectory_fingerprint(record);
126        let entry = clusters
127            .entry(fingerprint.clone())
128            .or_insert_with(|| ExperienceCluster {
129                fingerprint,
130                intent: record
131                    .project_id
132                    .clone()
133                    .unwrap_or_else(|| "repeated agent workflow".into()),
134                outcome: record.outcome,
135                trajectories: Vec::new(),
136                success_ratio: 0.0,
137                avg_duration_ms: 0,
138                avg_vitals_trust: None,
139            });
140        entry.trajectories.push(record.clone());
141    }
142    clusters
143        .into_values()
144        .filter_map(|mut cluster| {
145            enrich_cluster_stats(&mut cluster);
146            if cluster.success_ratio < policy.min_success_ratio {
147                return None;
148            }
149            if let Some(max_duration) = policy.max_avg_duration_ms {
150                if cluster.avg_duration_ms > max_duration {
151                    return None;
152                }
153            }
154            if let (Some(min_trust), Some(avg_trust)) =
155                (policy.min_avg_vitals_trust, cluster.avg_vitals_trust)
156            {
157                if avg_trust < min_trust {
158                    return None;
159                }
160            }
161            Some(cluster)
162        })
163        .collect()
164}
165
166fn enrich_cluster_stats(cluster: &mut ExperienceCluster) {
167    let count = cluster.trajectories.len().max(1);
168    let success = cluster
169        .trajectories
170        .iter()
171        .filter(|t| t.outcome == TrajectoryOutcome::Success && t.steps.iter().all(|s| s.success))
172        .count();
173    cluster.success_ratio = success as f32 / count as f32;
174    cluster.avg_duration_ms = cluster
175        .trajectories
176        .iter()
177        .map(|t| t.duration_ms)
178        .sum::<u64>()
179        / count as u64;
180    let trusts = cluster
181        .trajectories
182        .iter()
183        .flat_map(|t| t.steps.iter())
184        .filter_map(|step| step.vitals.as_ref().map(|v| v.trust))
185        .collect::<Vec<_>>();
186    if !trusts.is_empty() {
187        cluster.avg_vitals_trust = Some(trusts.iter().sum::<f32>() / trusts.len() as f32);
188    }
189}
190
191#[must_use]
192pub fn build_experience_bundle(cluster: &ExperienceCluster) -> ExperienceBundle {
193    let observation_count = cluster.trajectories.len() as u32;
194    let source_trajectory_ids = cluster
195        .trajectories
196        .iter()
197        .map(|t| t.episode_id.to_string())
198        .collect::<Vec<_>>();
199    let events = cluster
200        .trajectories
201        .first()
202        .map(|t| {
203            t.steps
204                .iter()
205                .map(ExperienceEvent::from)
206                .collect::<Vec<_>>()
207        })
208        .unwrap_or_default();
209    let fitness = if observation_count == 0 {
210        0.0
211    } else {
212        let base = cluster.success_ratio;
213        let delta = cluster
214            .trajectories
215            .iter()
216            .filter_map(|t| t.fitness_delta)
217            .sum::<f32>();
218        (base + (delta / observation_count as f32)).clamp(0.0, 1.0)
219    };
220    ExperienceBundle {
221        schema_version: LEARNER_SCHEMA_VERSION,
222        bundle_id: format!("experience:{}", cluster.fingerprint),
223        agent_id: "unknown".into(),
224        intent: cluster.intent.clone(),
225        outcome: cluster.outcome,
226        host_outcome: None,
227        observation_count,
228        fitness,
229        events,
230        source_trajectory_ids,
231        source_failure_ids: Vec::new(),
232        freshness: ContextFreshness::Unknown,
233        impact_decision: ImpactDecision::AllowExecute,
234    }
235}
236
237#[must_use]
238pub fn stable_tool_sequence(records: &[TrajectoryDraft]) -> Vec<String> {
239    let mut seqs = records
240        .iter()
241        .map(|record| {
242            record
243                .steps
244                .iter()
245                .map(|step| step.operation.clone())
246                .collect::<Vec<_>>()
247        })
248        .collect::<BTreeSet<_>>();
249    seqs.pop_first().unwrap_or_default()
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255
256    #[test]
257    fn draft_roundtrip_json() {
258        let mut d = TrajectoryDraft::new(Uuid::nil(), TrajectoryOutcome::Success);
259        d.duration_ms = 42;
260        d.steps.push(TrajectoryStep {
261            step_id: "a".into(),
262            timestamp_ms: 1,
263            adapter: "http".into(),
264            operation: "GET".into(),
265            inputs_preview: None,
266            outputs_preview: None,
267            duration_ms: 3,
268            success: true,
269            error: None,
270            vitals: None,
271            freshness_at_step: None,
272            frame_vars: None,
273            tool_telemetry: None,
274        });
275        let j = serde_json::to_string(&d).unwrap();
276        let back: TrajectoryDraft = serde_json::from_str(&j).unwrap();
277        assert_eq!(d, back);
278    }
279
280    #[test]
281    fn clusters_repeated_trajectories_into_experience_bundle() {
282        let mut a = TrajectoryDraft::new(Uuid::new_v4(), TrajectoryOutcome::Success);
283        a.project_id = Some("review".into());
284        a.steps.push(TrajectoryStep {
285            step_id: "a".into(),
286            timestamp_ms: 1,
287            adapter: "tool".into(),
288            operation: "file_read".into(),
289            inputs_preview: None,
290            outputs_preview: None,
291            duration_ms: 3,
292            success: true,
293            error: None,
294            vitals: None,
295            freshness_at_step: None,
296            frame_vars: None,
297            tool_telemetry: None,
298        });
299        let mut b = a.clone();
300        b.episode_id = Uuid::new_v4();
301        let clusters = cluster_experiences(&[a, b]);
302        assert_eq!(clusters.len(), 1);
303        let bundle = build_experience_bundle(&clusters[0]);
304        assert_eq!(bundle.observation_count, 2);
305        assert_eq!(bundle.events.len(), 1);
306        assert_eq!(bundle.fitness, 1.0);
307    }
308
309    #[test]
310    fn clustering_suppresses_failed_or_sparse_trajectories() {
311        let mut failed = TrajectoryDraft::new(Uuid::new_v4(), TrajectoryOutcome::Failure);
312        failed.project_id = Some("review".into());
313        failed.steps.push(TrajectoryStep {
314            step_id: "a".into(),
315            timestamp_ms: 1,
316            adapter: "tool".into(),
317            operation: "file_read".into(),
318            inputs_preview: None,
319            outputs_preview: None,
320            duration_ms: 3,
321            success: false,
322            error: Some("missing file".into()),
323            vitals: None,
324            freshness_at_step: None,
325            frame_vars: None,
326            tool_telemetry: None,
327        });
328        assert!(cluster_experiences(&[failed]).is_empty());
329
330        let sparse = TrajectoryDraft::new(Uuid::new_v4(), TrajectoryOutcome::Success);
331        let policy = ClusterPolicy {
332            min_steps: 1,
333            ..ClusterPolicy::default()
334        };
335        assert!(cluster_experiences_with_policy(&[sparse], &policy).is_empty());
336    }
337}