1#![forbid(unsafe_code)]
7
8pub mod replay;
9
10pub use ainl_contracts::{TrajectoryOutcome, TrajectoryStep};
11pub use replay::{
12 parse_jsonl, trajectory_replay_line, TrajectoryReplayLine, TRAJECTORY_REPLAY_SCHEMA_VERSION,
13};
14
15use std::collections::{BTreeMap, BTreeSet};
16
17use ainl_contracts::{
18 ContextFreshness, ExperienceBundle, ExperienceEvent, ImpactDecision, LEARNER_SCHEMA_VERSION,
19};
20use serde::{Deserialize, Serialize};
21use uuid::Uuid;
22
23#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
25pub struct TrajectoryDraft {
26 pub episode_id: Uuid,
27 pub session_id: String,
28 pub project_id: Option<String>,
29 pub ainl_source_hash: Option<String>,
30 pub outcome: TrajectoryOutcome,
31 pub steps: Vec<TrajectoryStep>,
32 pub duration_ms: u64,
33 #[serde(default, skip_serializing_if = "Option::is_none")]
34 pub frame_vars: Option<serde_json::Value>,
35 #[serde(default, skip_serializing_if = "Option::is_none")]
36 pub fitness_delta: Option<f32>,
37}
38
39impl TrajectoryDraft {
40 #[must_use]
41 pub fn new(episode_id: Uuid, outcome: TrajectoryOutcome) -> Self {
42 Self {
43 episode_id,
44 session_id: String::new(),
45 project_id: None,
46 ainl_source_hash: None,
47 outcome,
48 steps: Vec::new(),
49 duration_ms: 0,
50 frame_vars: None,
51 fitness_delta: None,
52 }
53 }
54
55 pub fn push_step(&mut self, step: TrajectoryStep) {
56 self.steps.push(step);
57 }
58}
59
60#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
61pub struct ExperienceCluster {
62 pub fingerprint: String,
63 pub intent: String,
64 pub outcome: TrajectoryOutcome,
65 pub trajectories: Vec<TrajectoryDraft>,
66 #[serde(default)]
67 pub success_ratio: f32,
68 #[serde(default)]
69 pub avg_duration_ms: u64,
70 #[serde(default)]
71 pub avg_vitals_trust: Option<f32>,
72}
73
74#[derive(Debug, Clone)]
75pub struct ClusterPolicy {
76 pub min_success_ratio: f32,
77 pub min_steps: usize,
78 pub max_avg_duration_ms: Option<u64>,
79 pub min_avg_vitals_trust: Option<f32>,
80}
81
82impl Default for ClusterPolicy {
83 fn default() -> Self {
84 Self {
85 min_success_ratio: 1.0,
86 min_steps: 1,
87 max_avg_duration_ms: Some(10 * 60 * 1000),
88 min_avg_vitals_trust: Some(0.55),
89 }
90 }
91}
92
93#[must_use]
94pub fn trajectory_fingerprint(record: &TrajectoryDraft) -> String {
95 let mut s = String::new();
96 s.push_str(record.project_id.as_deref().unwrap_or("global"));
97 s.push(':');
98 if let Some(source_hash) = record.ainl_source_hash.as_deref() {
99 s.push_str(source_hash);
100 }
101 for step in &record.steps {
102 s.push('|');
103 s.push_str(&step.adapter);
104 s.push('.');
105 s.push_str(&step.operation);
106 }
107 Uuid::new_v5(&Uuid::NAMESPACE_OID, s.as_bytes()).to_string()
108}
109
110#[must_use]
111pub fn cluster_experiences(records: &[TrajectoryDraft]) -> Vec<ExperienceCluster> {
112 cluster_experiences_with_policy(records, &ClusterPolicy::default())
113}
114
115#[must_use]
116pub fn cluster_experiences_with_policy(
117 records: &[TrajectoryDraft],
118 policy: &ClusterPolicy,
119) -> Vec<ExperienceCluster> {
120 let mut clusters: BTreeMap<String, ExperienceCluster> = BTreeMap::new();
121 for record in records {
122 if record.steps.len() < policy.min_steps {
123 continue;
124 }
125 let fingerprint = trajectory_fingerprint(record);
126 let entry = clusters
127 .entry(fingerprint.clone())
128 .or_insert_with(|| ExperienceCluster {
129 fingerprint,
130 intent: record
131 .project_id
132 .clone()
133 .unwrap_or_else(|| "repeated agent workflow".into()),
134 outcome: record.outcome,
135 trajectories: Vec::new(),
136 success_ratio: 0.0,
137 avg_duration_ms: 0,
138 avg_vitals_trust: None,
139 });
140 entry.trajectories.push(record.clone());
141 }
142 clusters
143 .into_values()
144 .filter_map(|mut cluster| {
145 enrich_cluster_stats(&mut cluster);
146 if cluster.success_ratio < policy.min_success_ratio {
147 return None;
148 }
149 if let Some(max_duration) = policy.max_avg_duration_ms {
150 if cluster.avg_duration_ms > max_duration {
151 return None;
152 }
153 }
154 if let (Some(min_trust), Some(avg_trust)) =
155 (policy.min_avg_vitals_trust, cluster.avg_vitals_trust)
156 {
157 if avg_trust < min_trust {
158 return None;
159 }
160 }
161 Some(cluster)
162 })
163 .collect()
164}
165
166fn enrich_cluster_stats(cluster: &mut ExperienceCluster) {
167 let count = cluster.trajectories.len().max(1);
168 let success = cluster
169 .trajectories
170 .iter()
171 .filter(|t| t.outcome == TrajectoryOutcome::Success && t.steps.iter().all(|s| s.success))
172 .count();
173 cluster.success_ratio = success as f32 / count as f32;
174 cluster.avg_duration_ms = cluster
175 .trajectories
176 .iter()
177 .map(|t| t.duration_ms)
178 .sum::<u64>()
179 / count as u64;
180 let trusts = cluster
181 .trajectories
182 .iter()
183 .flat_map(|t| t.steps.iter())
184 .filter_map(|step| step.vitals.as_ref().map(|v| v.trust))
185 .collect::<Vec<_>>();
186 if !trusts.is_empty() {
187 cluster.avg_vitals_trust = Some(trusts.iter().sum::<f32>() / trusts.len() as f32);
188 }
189}
190
191#[must_use]
192pub fn build_experience_bundle(cluster: &ExperienceCluster) -> ExperienceBundle {
193 let observation_count = cluster.trajectories.len() as u32;
194 let source_trajectory_ids = cluster
195 .trajectories
196 .iter()
197 .map(|t| t.episode_id.to_string())
198 .collect::<Vec<_>>();
199 let events = cluster
200 .trajectories
201 .first()
202 .map(|t| {
203 t.steps
204 .iter()
205 .map(ExperienceEvent::from)
206 .collect::<Vec<_>>()
207 })
208 .unwrap_or_default();
209 let fitness = if observation_count == 0 {
210 0.0
211 } else {
212 let base = cluster.success_ratio;
213 let delta = cluster
214 .trajectories
215 .iter()
216 .filter_map(|t| t.fitness_delta)
217 .sum::<f32>();
218 (base + (delta / observation_count as f32)).clamp(0.0, 1.0)
219 };
220 ExperienceBundle {
221 schema_version: LEARNER_SCHEMA_VERSION,
222 bundle_id: format!("experience:{}", cluster.fingerprint),
223 agent_id: "unknown".into(),
224 intent: cluster.intent.clone(),
225 outcome: cluster.outcome,
226 host_outcome: None,
227 observation_count,
228 fitness,
229 events,
230 source_trajectory_ids,
231 source_failure_ids: Vec::new(),
232 freshness: ContextFreshness::Unknown,
233 impact_decision: ImpactDecision::AllowExecute,
234 }
235}
236
237#[must_use]
238pub fn stable_tool_sequence(records: &[TrajectoryDraft]) -> Vec<String> {
239 let mut seqs = records
240 .iter()
241 .map(|record| {
242 record
243 .steps
244 .iter()
245 .map(|step| step.operation.clone())
246 .collect::<Vec<_>>()
247 })
248 .collect::<BTreeSet<_>>();
249 seqs.pop_first().unwrap_or_default()
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255
256 #[test]
257 fn draft_roundtrip_json() {
258 let mut d = TrajectoryDraft::new(Uuid::nil(), TrajectoryOutcome::Success);
259 d.duration_ms = 42;
260 d.steps.push(TrajectoryStep {
261 step_id: "a".into(),
262 timestamp_ms: 1,
263 adapter: "http".into(),
264 operation: "GET".into(),
265 inputs_preview: None,
266 outputs_preview: None,
267 duration_ms: 3,
268 success: true,
269 error: None,
270 vitals: None,
271 freshness_at_step: None,
272 frame_vars: None,
273 tool_telemetry: None,
274 });
275 let j = serde_json::to_string(&d).unwrap();
276 let back: TrajectoryDraft = serde_json::from_str(&j).unwrap();
277 assert_eq!(d, back);
278 }
279
280 #[test]
281 fn clusters_repeated_trajectories_into_experience_bundle() {
282 let mut a = TrajectoryDraft::new(Uuid::new_v4(), TrajectoryOutcome::Success);
283 a.project_id = Some("review".into());
284 a.steps.push(TrajectoryStep {
285 step_id: "a".into(),
286 timestamp_ms: 1,
287 adapter: "tool".into(),
288 operation: "file_read".into(),
289 inputs_preview: None,
290 outputs_preview: None,
291 duration_ms: 3,
292 success: true,
293 error: None,
294 vitals: None,
295 freshness_at_step: None,
296 frame_vars: None,
297 tool_telemetry: None,
298 });
299 let mut b = a.clone();
300 b.episode_id = Uuid::new_v4();
301 let clusters = cluster_experiences(&[a, b]);
302 assert_eq!(clusters.len(), 1);
303 let bundle = build_experience_bundle(&clusters[0]);
304 assert_eq!(bundle.observation_count, 2);
305 assert_eq!(bundle.events.len(), 1);
306 assert_eq!(bundle.fitness, 1.0);
307 }
308
309 #[test]
310 fn clustering_suppresses_failed_or_sparse_trajectories() {
311 let mut failed = TrajectoryDraft::new(Uuid::new_v4(), TrajectoryOutcome::Failure);
312 failed.project_id = Some("review".into());
313 failed.steps.push(TrajectoryStep {
314 step_id: "a".into(),
315 timestamp_ms: 1,
316 adapter: "tool".into(),
317 operation: "file_read".into(),
318 inputs_preview: None,
319 outputs_preview: None,
320 duration_ms: 3,
321 success: false,
322 error: Some("missing file".into()),
323 vitals: None,
324 freshness_at_step: None,
325 frame_vars: None,
326 tool_telemetry: None,
327 });
328 assert!(cluster_experiences(&[failed]).is_empty());
329
330 let sparse = TrajectoryDraft::new(Uuid::new_v4(), TrajectoryOutcome::Success);
331 let policy = ClusterPolicy {
332 min_steps: 1,
333 ..ClusterPolicy::default()
334 };
335 assert!(cluster_experiences_with_policy(&[sparse], &policy).is_empty());
336 }
337}