1use serde_json::{json, Map, Value};
56
57use crate::intent::{IntentId, SessionId};
58use crate::op_log::OpLog;
59use crate::operation::{OpId, OperationRecord};
60
61#[derive(Debug, Clone, PartialEq, Eq)]
69pub enum Predicate {
70 All,
75 Intent { intent_id: IntentId },
77 Session { session_id: SessionId },
83 AncestorOf { op_id: OpId },
86 And(Vec<Predicate>),
88 Or(Vec<Predicate>),
91 Not(Box<Predicate>),
98}
99
100impl Predicate {
109 pub fn to_value(&self) -> Value {
113 match self {
114 Predicate::All => json!({"predicate": "all"}),
115 Predicate::Intent { intent_id } => json!({
116 "predicate": "intent",
117 "intent_id": intent_id,
118 }),
119 Predicate::Session { session_id } => json!({
120 "predicate": "session",
121 "session_id": session_id,
122 }),
123 Predicate::AncestorOf { op_id } => json!({
124 "predicate": "ancestor_of",
125 "op_id": op_id,
126 }),
127 Predicate::And(ps) => {
128 let arr: Vec<Value> = ps.iter().map(|p| p.to_value()).collect();
129 json!({"predicate": "and", "clauses": arr})
130 }
131 Predicate::Or(ps) => {
132 let arr: Vec<Value> = ps.iter().map(|p| p.to_value()).collect();
133 json!({"predicate": "or", "clauses": arr})
134 }
135 Predicate::Not(p) => json!({
136 "predicate": "not",
137 "clause": p.to_value(),
138 }),
139 }
140 }
141
142 pub fn from_value(v: &Value) -> Result<Self, String> {
146 let obj: &Map<String, Value> = v
147 .as_object()
148 .ok_or_else(|| "predicate must be a JSON object".to_string())?;
149 let tag = obj
150 .get("predicate")
151 .and_then(|t| t.as_str())
152 .ok_or_else(|| "predicate object missing 'predicate' tag".to_string())?;
153 match tag {
154 "all" => Ok(Predicate::All),
155 "intent" => {
156 let id = obj
157 .get("intent_id")
158 .and_then(|x| x.as_str())
159 .ok_or_else(|| "intent: missing intent_id".to_string())?
160 .to_string();
161 Ok(Predicate::Intent { intent_id: id })
162 }
163 "session" => {
164 let id = obj
165 .get("session_id")
166 .and_then(|x| x.as_str())
167 .ok_or_else(|| "session: missing session_id".to_string())?
168 .to_string();
169 Ok(Predicate::Session { session_id: id })
170 }
171 "ancestor_of" => {
172 let id = obj
173 .get("op_id")
174 .and_then(|x| x.as_str())
175 .ok_or_else(|| "ancestor_of: missing op_id".to_string())?
176 .to_string();
177 Ok(Predicate::AncestorOf { op_id: id })
178 }
179 "and" | "or" => {
180 let arr = obj
181 .get("clauses")
182 .and_then(|x| x.as_array())
183 .ok_or_else(|| format!("{tag}: missing 'clauses' array"))?;
184 let mut ps = Vec::with_capacity(arr.len());
185 for item in arr {
186 ps.push(Predicate::from_value(item)?);
187 }
188 Ok(if tag == "and" {
189 Predicate::And(ps)
190 } else {
191 Predicate::Or(ps)
192 })
193 }
194 "not" => {
195 let inner = obj
196 .get("clause")
197 .ok_or_else(|| "not: missing 'clause'".to_string())?;
198 Ok(Predicate::Not(Box::new(Predicate::from_value(inner)?)))
199 }
200 other => Err(format!("unknown predicate tag: {other}")),
201 }
202 }
203
204 pub fn to_json_string(&self) -> String {
206 self.to_value().to_string()
207 }
208
209 pub fn from_json_str(s: &str) -> Result<Self, String> {
211 let v: Value = serde_json::from_str(s).map_err(|e| e.to_string())?;
212 Self::from_value(&v)
213 }
214}
215
216impl Predicate {
217 fn needs_intent_resolution(&self) -> bool {
221 match self {
222 Predicate::Session { .. } => true,
223 Predicate::And(ps) | Predicate::Or(ps) => {
224 ps.iter().any(|p| p.needs_intent_resolution())
225 }
226 Predicate::Not(p) => p.needs_intent_resolution(),
227 _ => false,
228 }
229 }
230
231 fn candidate_root(&self) -> CandidateRoot {
236 match self {
237 Predicate::AncestorOf { op_id } => CandidateRoot::Ancestry(op_id.clone()),
238 Predicate::And(ps) => {
239 ps.iter()
243 .map(|p| p.candidate_root())
244 .find(|r| matches!(r, CandidateRoot::Ancestry(_)))
245 .unwrap_or(CandidateRoot::All)
246 }
247 _ => CandidateRoot::All,
248 }
249 }
250}
251
252#[derive(Debug, Clone)]
253enum CandidateRoot {
254 All,
255 Ancestry(OpId),
256}
257
258pub trait IntentResolver {
263 fn session_of(&self, intent_id: &IntentId) -> Option<SessionId>;
265}
266
267pub fn evaluate(
272 op_log: &OpLog,
273 predicate: &Predicate,
274) -> std::io::Result<Vec<OperationRecord>> {
275 if predicate.needs_intent_resolution() {
276 evaluate_with_resolver(op_log, predicate, &NullResolver)
280 } else {
281 evaluate_with_resolver(op_log, predicate, &NullResolver)
282 }
283}
284
285pub fn evaluate_with_resolver<R: IntentResolver + ?Sized>(
291 op_log: &OpLog,
292 predicate: &Predicate,
293 resolver: &R,
294) -> std::io::Result<Vec<OperationRecord>> {
295 let mut ancestries: std::collections::BTreeMap<OpId, std::collections::BTreeSet<OpId>> =
299 std::collections::BTreeMap::new();
300 collect_ancestor_ops(predicate, op_log, &mut ancestries)?;
301
302 let candidates = candidate_set(op_log, &predicate.candidate_root())?;
303 Ok(candidates
304 .into_iter()
305 .filter(|r| matches(r, predicate, resolver, &ancestries))
306 .collect())
307}
308
309fn collect_ancestor_ops(
310 predicate: &Predicate,
311 op_log: &OpLog,
312 out: &mut std::collections::BTreeMap<OpId, std::collections::BTreeSet<OpId>>,
313) -> std::io::Result<()> {
314 match predicate {
315 Predicate::AncestorOf { op_id } if !out.contains_key(op_id) => {
316 let set: std::collections::BTreeSet<OpId> = op_log
317 .walk_back(op_id, None)?
318 .into_iter()
319 .map(|r| r.op_id)
320 .collect();
321 out.insert(op_id.clone(), set);
322 }
323 Predicate::AncestorOf { .. } => {}
324 Predicate::And(ps) | Predicate::Or(ps) => {
325 for p in ps {
326 collect_ancestor_ops(p, op_log, out)?;
327 }
328 }
329 Predicate::Not(p) => collect_ancestor_ops(p, op_log, out)?,
330 _ => {}
331 }
332 Ok(())
333}
334
335fn candidate_set(
336 op_log: &OpLog,
337 root: &CandidateRoot,
338) -> std::io::Result<Vec<OperationRecord>> {
339 match root {
340 CandidateRoot::Ancestry(head) => op_log.walk_back(head, None),
341 CandidateRoot::All => op_log.list_all(),
342 }
343}
344
345fn matches<R: IntentResolver + ?Sized>(
346 rec: &OperationRecord,
347 predicate: &Predicate,
348 resolver: &R,
349 ancestries: &std::collections::BTreeMap<OpId, std::collections::BTreeSet<OpId>>,
350) -> bool {
351 match predicate {
352 Predicate::All => true,
353 Predicate::Intent { intent_id } => {
354 rec.op.intent_id.as_deref() == Some(intent_id)
355 }
356 Predicate::Session { session_id } => match &rec.op.intent_id {
357 Some(id) => match resolver.session_of(id) {
358 Some(s) => &s == session_id,
359 None => false,
360 },
361 None => false,
362 },
363 Predicate::AncestorOf { op_id } => match ancestries.get(op_id) {
364 Some(set) => set.contains(&rec.op_id),
365 None => false,
366 },
367 Predicate::And(ps) => ps.iter().all(|p| matches(rec, p, resolver, ancestries)),
368 Predicate::Or(ps) => ps.iter().any(|p| matches(rec, p, resolver, ancestries)),
369 Predicate::Not(p) => !matches(rec, p, resolver, ancestries),
370 }
371}
372
373struct NullResolver;
378
379impl IntentResolver for NullResolver {
380 fn session_of(&self, _intent_id: &IntentId) -> Option<SessionId> {
381 None
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388 use crate::operation::{Operation, OperationKind, StageTransition};
389 use std::collections::{BTreeSet, HashMap};
390
391 struct MapResolver(HashMap<IntentId, SessionId>);
393
394 impl IntentResolver for MapResolver {
395 fn session_of(&self, intent_id: &IntentId) -> Option<SessionId> {
396 self.0.get(intent_id).cloned()
397 }
398 }
399
400 fn add_op_with_intent(sig: &str, stage: &str, intent: Option<&str>) -> OperationRecord {
401 let mut op = Operation::new(
402 OperationKind::AddFunction {
403 sig_id: sig.into(),
404 stage_id: stage.into(),
405 effects: BTreeSet::new(),
406 budget_cost: None,
407 },
408 [],
409 );
410 if let Some(id) = intent {
411 op = op.with_intent(id);
412 }
413 OperationRecord::new(
414 op,
415 StageTransition::Create {
416 sig_id: sig.into(),
417 stage_id: stage.into(),
418 },
419 )
420 }
421
422 fn modify_op_with_parent_and_intent(
423 parent: &OpId,
424 sig: &str,
425 from: &str,
426 to: &str,
427 intent: Option<&str>,
428 ) -> OperationRecord {
429 let mut op = Operation::new(
430 OperationKind::ModifyBody {
431 sig_id: sig.into(),
432 from_stage_id: from.into(),
433 to_stage_id: to.into(),
434 from_budget: None,
435 to_budget: None,
436 },
437 [parent.clone()],
438 );
439 if let Some(id) = intent {
440 op = op.with_intent(id);
441 }
442 OperationRecord::new(
443 op,
444 StageTransition::Replace {
445 sig_id: sig.into(),
446 from: from.into(),
447 to: to.into(),
448 },
449 )
450 }
451
452 fn three_op_log() -> (tempfile::TempDir, OpLog, [OpId; 3]) {
455 let tmp = tempfile::tempdir().unwrap();
456 let log = OpLog::open(tmp.path()).unwrap();
457 let r0 = add_op_with_intent("fn::Int->Int", "stage-0", None);
458 let r1 = modify_op_with_parent_and_intent(
459 &r0.op_id,
460 "fn::Int->Int",
461 "stage-0",
462 "stage-1",
463 Some("intent-X"),
464 );
465 let r2 = modify_op_with_parent_and_intent(
466 &r1.op_id,
467 "fn::Int->Int",
468 "stage-1",
469 "stage-2",
470 Some("intent-Y"),
471 );
472 let ids = [r0.op_id.clone(), r1.op_id.clone(), r2.op_id.clone()];
473 log.put(&r0).unwrap();
474 log.put(&r1).unwrap();
475 log.put(&r2).unwrap();
476 (tmp, log, ids)
477 }
478
479 #[test]
480 fn all_returns_every_op() {
481 let (_tmp, log, _) = three_op_log();
482 let v = evaluate(&log, &Predicate::All).unwrap();
483 assert_eq!(v.len(), 3);
484 }
485
486 #[test]
487 fn intent_filters_by_intent_id() {
488 let (_tmp, log, _) = three_op_log();
489 let v = evaluate(&log, &Predicate::Intent { intent_id: "intent-X".into() }).unwrap();
490 assert_eq!(v.len(), 1, "exactly one op carries intent-X");
491 assert_eq!(v[0].op.intent_id.as_deref(), Some("intent-X"));
492 }
493
494 #[test]
495 fn intent_unknown_returns_empty() {
496 let (_tmp, log, _) = three_op_log();
497 let v = evaluate(&log, &Predicate::Intent { intent_id: "unknown".into() }).unwrap();
498 assert!(v.is_empty());
499 }
500
501 #[test]
502 fn ancestor_of_head_returns_full_ancestry() {
503 let (_tmp, log, ids) = three_op_log();
504 let head = ids[2].clone();
505 let v = evaluate(&log, &Predicate::AncestorOf { op_id: head.clone() }).unwrap();
506 assert_eq!(v.len(), 3, "head plus its 2 ancestors");
507 }
508
509 #[test]
510 fn ancestor_of_middle_returns_two() {
511 let (_tmp, log, ids) = three_op_log();
512 let v = evaluate(&log, &Predicate::AncestorOf { op_id: ids[1].clone() }).unwrap();
513 assert_eq!(v.len(), 2, "middle op plus its single ancestor");
514 }
515
516 #[test]
517 fn and_intersects_clauses() {
518 let (_tmp, log, ids) = three_op_log();
519 let head = ids[2].clone();
522 let v = evaluate(
523 &log,
524 &Predicate::And(vec![
525 Predicate::Intent { intent_id: "intent-Y".into() },
526 Predicate::AncestorOf { op_id: head },
527 ]),
528 )
529 .unwrap();
530 assert_eq!(v.len(), 1);
531 assert_eq!(v[0].op.intent_id.as_deref(), Some("intent-Y"));
532 }
533
534 #[test]
535 fn and_with_disjoint_clauses_is_empty() {
536 let (_tmp, log, _) = three_op_log();
537 let v = evaluate(
538 &log,
539 &Predicate::And(vec![
540 Predicate::Intent { intent_id: "intent-X".into() },
541 Predicate::Intent { intent_id: "intent-Y".into() },
542 ]),
543 )
544 .unwrap();
545 assert!(
546 v.is_empty(),
547 "no op carries both intents simultaneously",
548 );
549 }
550
551 #[test]
552 fn or_unions_clauses() {
553 let (_tmp, log, _) = three_op_log();
554 let v = evaluate(
555 &log,
556 &Predicate::Or(vec![
557 Predicate::Intent { intent_id: "intent-X".into() },
558 Predicate::Intent { intent_id: "intent-Y".into() },
559 ]),
560 )
561 .unwrap();
562 assert_eq!(v.len(), 2, "two ops carry either intent");
563 }
564
565 #[test]
566 fn not_inverts() {
567 let (_tmp, log, _) = three_op_log();
568 let v = evaluate(
569 &log,
570 &Predicate::Not(Box::new(Predicate::Intent { intent_id: "intent-X".into() })),
571 )
572 .unwrap();
573 assert_eq!(v.len(), 2);
575 assert!(v.iter().all(|r| r.op.intent_id.as_deref() != Some("intent-X")));
576 }
577
578 #[test]
579 fn session_resolves_through_resolver() {
580 let (_tmp, log, _) = three_op_log();
581 let mut m = HashMap::new();
583 m.insert("intent-X".to_string(), "session-A".to_string());
584 m.insert("intent-Y".to_string(), "session-B".to_string());
585 let resolver = MapResolver(m);
586
587 let v = evaluate_with_resolver(
588 &log,
589 &Predicate::Session { session_id: "session-A".into() },
590 &resolver,
591 )
592 .unwrap();
593 assert_eq!(v.len(), 1, "exactly one op runs under session-A");
594 assert_eq!(v[0].op.intent_id.as_deref(), Some("intent-X"));
595 }
596
597 #[test]
598 fn session_with_unknown_id_returns_empty() {
599 let (_tmp, log, _) = three_op_log();
600 let resolver = MapResolver(HashMap::new());
601 let v = evaluate_with_resolver(
602 &log,
603 &Predicate::Session { session_id: "unknown".into() },
604 &resolver,
605 )
606 .unwrap();
607 assert!(v.is_empty());
608 }
609
610 #[test]
611 fn session_without_resolver_via_evaluate_returns_empty() {
612 let (_tmp, log, _) = three_op_log();
613 let v = evaluate(&log, &Predicate::Session { session_id: "session-A".into() }).unwrap();
618 assert!(v.is_empty());
619 }
620
621 #[test]
622 fn predicate_round_trips_through_json_value() {
623 let p = Predicate::And(vec![
624 Predicate::Intent { intent_id: "i-X".into() },
625 Predicate::Or(vec![
626 Predicate::Session { session_id: "s-A".into() },
627 Predicate::Not(Box::new(Predicate::All)),
628 ]),
629 Predicate::AncestorOf { op_id: "op-123".into() },
630 ]);
631 let s = p.to_json_string();
632 let back = Predicate::from_json_str(&s).unwrap();
633 assert_eq!(p, back);
634 }
635
636 #[test]
637 fn from_json_str_rejects_unknown_tag() {
638 let s = r#"{"predicate":"custom","whatever":1}"#;
639 assert!(Predicate::from_json_str(s).is_err());
640 }
641
642 #[test]
643 fn from_json_str_rejects_missing_field() {
644 let s = r#"{"predicate":"intent"}"#;
646 assert!(Predicate::from_json_str(s).is_err());
647 }
648
649 #[test]
650 fn empty_log_returns_empty_for_all() {
651 let tmp = tempfile::tempdir().unwrap();
652 let log = OpLog::open(tmp.path()).unwrap();
653 let v = evaluate(&log, &Predicate::All).unwrap();
654 assert!(v.is_empty());
655 }
656
657 #[test]
666 fn linear_scan_performance_smoke() {
667 let tmp = tempfile::tempdir().unwrap();
668 let log = OpLog::open(tmp.path()).unwrap();
669 let mut prev: Option<OpId> = None;
670 for i in 0..100 {
671 let intent = if i % 3 == 0 { Some(format!("intent-{}", i % 5)) } else { None };
672 let rec = match &prev {
673 Some(p) => modify_op_with_parent_and_intent(
674 p,
675 &format!("fn-{i}"),
676 &format!("from-{i}"),
677 &format!("to-{i}"),
678 intent.as_deref(),
679 ),
680 None => add_op_with_intent(&format!("fn-{i}"), &format!("stage-{i}"), intent.as_deref()),
681 };
682 prev = Some(rec.op_id.clone());
683 log.put(&rec).unwrap();
684 }
685 let start = std::time::Instant::now();
686 let v = evaluate(&log, &Predicate::Intent { intent_id: "intent-2".into() }).unwrap();
687 let elapsed = start.elapsed();
688 assert!(!v.is_empty());
689 assert!(
690 elapsed < std::time::Duration::from_secs(5),
691 "100-op predicate eval took {elapsed:?}, expected < 5s",
692 );
693 }
694
695}