1use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use rayon::prelude::*;
11
12use crate::constraints::{Constraint, ConstraintContext, ConstraintEvaluationResult};
13use crate::{ConstraintComponentId, Result, Shape, ShapeId};
14
15#[derive(Debug, Clone)]
21pub struct ParallelConstraintConfig {
22 pub max_threads: usize,
24
25 pub parallel_threshold: usize,
30
31 pub fail_fast: bool,
37}
38
39impl Default for ParallelConstraintConfig {
40 fn default() -> Self {
41 Self {
42 max_threads: 0,
43 parallel_threshold: 4,
44 fail_fast: false,
45 }
46 }
47}
48
49#[derive(Debug, Clone)]
55pub struct ParallelConstraintOutcome {
56 pub component_id: ConstraintComponentId,
58 pub result: ConstraintEvaluationResult,
60 pub elapsed: Duration,
62 pub worker_hint: usize,
64}
65
66#[derive(Debug, Clone)]
72pub struct ParallelValidationSummary {
73 pub shape_id: ShapeId,
75 pub outcomes: Vec<ParallelConstraintOutcome>,
77 pub all_satisfied: bool,
79 pub total_elapsed: Duration,
81 pub parallel_count: usize,
83 pub serial_count: usize,
85}
86
87impl ParallelValidationSummary {
88 pub fn violations(&self) -> impl Iterator<Item = &ParallelConstraintOutcome> {
90 self.outcomes
91 .iter()
92 .filter(|o| matches!(o.result, ConstraintEvaluationResult::Violated { .. }))
93 }
94
95 pub fn violation_count(&self) -> usize {
97 self.violations().count()
98 }
99}
100
101pub struct ParallelConstraintValidator {
112 config: ParallelConstraintConfig,
113}
114
115impl ParallelConstraintValidator {
116 pub fn new(config: ParallelConstraintConfig) -> Self {
118 Self { config }
119 }
120
121 pub fn with_defaults() -> Self {
123 Self::new(ParallelConstraintConfig::default())
124 }
125
126 pub fn validate_shape<F>(
134 &self,
135 shape: &Shape,
136 context: &ConstraintContext,
137 constraint_evaluator: F,
138 ) -> Result<ParallelValidationSummary>
139 where
140 F: Fn(
141 &ConstraintComponentId,
142 &Constraint,
143 &ConstraintContext,
144 ) -> Result<ConstraintEvaluationResult>
145 + Send
146 + Sync,
147 {
148 if shape.deactivated {
149 return Ok(ParallelValidationSummary {
150 shape_id: shape.id.clone(),
151 outcomes: Vec::new(),
152 all_satisfied: true,
153 total_elapsed: Duration::ZERO,
154 parallel_count: 0,
155 serial_count: 0,
156 });
157 }
158
159 let constraints: Vec<(ConstraintComponentId, Constraint)> = shape
160 .constraints
161 .iter()
162 .map(|(id, c)| (id.clone(), c.clone()))
163 .collect();
164
165 let start = Instant::now();
166
167 let outcomes = if constraints.len() >= self.config.parallel_threshold {
168 self.evaluate_parallel(&constraints, context, &constraint_evaluator)?
169 } else {
170 self.evaluate_serial(&constraints, context, &constraint_evaluator)?
171 };
172
173 let total_elapsed = start.elapsed();
174 let (parallel_count, serial_count) = if constraints.len() >= self.config.parallel_threshold
175 {
176 (constraints.len(), 0)
177 } else {
178 (0, constraints.len())
179 };
180
181 let all_satisfied = outcomes
182 .iter()
183 .all(|o| matches!(o.result, ConstraintEvaluationResult::Satisfied));
184
185 Ok(ParallelValidationSummary {
186 shape_id: shape.id.clone(),
187 outcomes,
188 all_satisfied,
189 total_elapsed,
190 parallel_count,
191 serial_count,
192 })
193 }
194
195 pub fn validate_nodes<F>(
200 &self,
201 shape: &Shape,
202 contexts: &[ConstraintContext],
203 constraint_evaluator: F,
204 ) -> Result<Vec<ParallelValidationSummary>>
205 where
206 F: Fn(
207 &ConstraintComponentId,
208 &Constraint,
209 &ConstraintContext,
210 ) -> Result<ConstraintEvaluationResult>
211 + Send
212 + Sync,
213 {
214 let evaluator_arc = Arc::new(constraint_evaluator);
215
216 contexts
217 .par_iter()
218 .map(|ctx| {
219 let eval = Arc::clone(&evaluator_arc);
220 self.validate_shape(shape, ctx, |id, c, ctx| eval(id, c, ctx))
221 })
222 .collect()
223 }
224
225 fn evaluate_parallel<F>(
228 &self,
229 constraints: &[(ConstraintComponentId, Constraint)],
230 context: &ConstraintContext,
231 evaluator: &F,
232 ) -> Result<Vec<ParallelConstraintOutcome>>
233 where
234 F: Fn(
235 &ConstraintComponentId,
236 &Constraint,
237 &ConstraintContext,
238 ) -> Result<ConstraintEvaluationResult>
239 + Send
240 + Sync,
241 {
242 use std::sync::atomic::{AtomicUsize, Ordering};
245 static WORKER_COUNTER: AtomicUsize = AtomicUsize::new(0);
246
247 constraints
248 .par_iter()
249 .map(|(id, constraint)| {
250 let worker_hint = WORKER_COUNTER.fetch_add(1, Ordering::Relaxed)
251 % rayon::current_num_threads().max(1);
252 let t0 = Instant::now();
253 let result = evaluator(id, constraint, context)?;
254 let elapsed = t0.elapsed();
255 Ok(ParallelConstraintOutcome {
256 component_id: id.clone(),
257 result,
258 elapsed,
259 worker_hint,
260 })
261 })
262 .collect::<Result<Vec<_>>>()
263 }
264
265 fn evaluate_serial<F>(
268 &self,
269 constraints: &[(ConstraintComponentId, Constraint)],
270 context: &ConstraintContext,
271 evaluator: &F,
272 ) -> Result<Vec<ParallelConstraintOutcome>>
273 where
274 F: Fn(
275 &ConstraintComponentId,
276 &Constraint,
277 &ConstraintContext,
278 ) -> Result<ConstraintEvaluationResult>
279 + Send
280 + Sync,
281 {
282 constraints
283 .iter()
284 .enumerate()
285 .map(|(idx, (id, constraint))| {
286 let t0 = Instant::now();
287 let result = evaluator(id, constraint, context)?;
288 let elapsed = t0.elapsed();
289
290 if self.config.fail_fast
291 && matches!(result, ConstraintEvaluationResult::Violated { .. })
292 {
293 return Ok(ParallelConstraintOutcome {
295 component_id: id.clone(),
296 result,
297 elapsed,
298 worker_hint: idx,
299 });
300 }
301
302 Ok(ParallelConstraintOutcome {
303 component_id: id.clone(),
304 result,
305 elapsed,
306 worker_hint: idx,
307 })
308 })
309 .collect::<Result<Vec<_>>>()
310 }
311}
312
313#[derive(Debug, Clone, Default)]
319pub struct ParallelValidationStats {
320 pub shapes_validated: usize,
322 pub constraints_evaluated: usize,
324 pub total_elapsed: Duration,
326 pub total_violations: usize,
328 pub avg_constraint_time: Duration,
330}
331
332impl ParallelValidationStats {
333 pub fn merge(&mut self, summary: &ParallelValidationSummary) {
335 self.shapes_validated += 1;
336 self.constraints_evaluated += summary.outcomes.len();
337 self.total_elapsed += summary.total_elapsed;
338 self.total_violations += summary.violation_count();
339
340 let total_nanos: u128 = summary.outcomes.iter().map(|o| o.elapsed.as_nanos()).sum();
341 let count = summary.outcomes.len().max(1);
342 let avg_nanos = total_nanos / count as u128;
343 self.avg_constraint_time = Duration::from_nanos(
345 ((self.avg_constraint_time.as_nanos()
346 * (self.constraints_evaluated.saturating_sub(count)) as u128
347 + avg_nanos * count as u128)
348 / self.constraints_evaluated.max(1) as u128) as u64,
349 );
350 }
351}
352
353#[cfg(test)]
358mod tests {
359 use super::*;
360 use crate::constraints::{
361 cardinality_constraints::MinCountConstraint, constraint_context::ConstraintContext,
362 Constraint,
363 };
364 use crate::ShaclError;
365 use crate::{ConstraintComponentId, Shape, ShapeId, ShapeType};
366 use std::collections::HashMap;
367
368 use oxirs_core::model::{NamedNode, Term};
369
370 fn make_shape_with_constraints(n: usize) -> Shape {
371 let mut shape = Shape::new(ShapeId::new("http://ex/TestShape"), ShapeType::NodeShape);
372 for i in 0..n {
373 let id = ConstraintComponentId::new(format!("sh:minCount_{i}"));
374 shape.constraints.insert(
375 id,
376 Constraint::MinCount(MinCountConstraint { min_count: 0 }),
377 );
378 }
379 shape
380 }
381
382 fn dummy_context() -> ConstraintContext {
383 let focus = Term::NamedNode(NamedNode::new("http://ex/Alice").expect("valid IRI"));
384 ConstraintContext::new(focus, ShapeId::new("http://ex/TestShape"))
385 }
386
387 fn always_satisfied_evaluator(
388 _id: &ConstraintComponentId,
389 _c: &Constraint,
390 _ctx: &ConstraintContext,
391 ) -> Result<ConstraintEvaluationResult> {
392 Ok(ConstraintEvaluationResult::Satisfied)
393 }
394
395 fn always_violated_evaluator(
396 _id: &ConstraintComponentId,
397 _c: &Constraint,
398 _ctx: &ConstraintContext,
399 ) -> Result<ConstraintEvaluationResult> {
400 Ok(ConstraintEvaluationResult::Violated {
401 violating_value: None,
402 message: Some("test violation".to_string()),
403 details: HashMap::new(),
404 })
405 }
406
407 #[test]
410 fn test_all_satisfied_parallel() {
411 let shape = make_shape_with_constraints(10);
412 let ctx = dummy_context();
413 let validator = ParallelConstraintValidator::with_defaults();
414
415 let summary = validator
416 .validate_shape(&shape, &ctx, always_satisfied_evaluator)
417 .expect("validation should succeed");
418
419 assert!(summary.all_satisfied);
420 assert_eq!(summary.violation_count(), 0);
421 assert_eq!(summary.outcomes.len(), 10);
422 }
423
424 #[test]
425 fn test_violations_detected_parallel() {
426 let shape = make_shape_with_constraints(6);
427 let ctx = dummy_context();
428 let validator = ParallelConstraintValidator::with_defaults();
429
430 let summary = validator
431 .validate_shape(&shape, &ctx, always_violated_evaluator)
432 .expect("validation should succeed");
433
434 assert!(!summary.all_satisfied);
435 assert_eq!(summary.violation_count(), 6);
436 }
437
438 #[test]
441 fn test_serial_for_small_shape() {
442 let shape = make_shape_with_constraints(2); let ctx = dummy_context();
444 let validator = ParallelConstraintValidator::with_defaults();
445
446 let summary = validator
447 .validate_shape(&shape, &ctx, always_satisfied_evaluator)
448 .expect("validation should succeed");
449
450 assert_eq!(summary.serial_count, 2);
451 assert_eq!(summary.parallel_count, 0);
452 assert!(summary.all_satisfied);
453 }
454
455 #[test]
458 fn test_deactivated_shape_skipped() {
459 let mut shape = make_shape_with_constraints(5);
460 shape.deactivated = true;
461 let ctx = dummy_context();
462 let validator = ParallelConstraintValidator::with_defaults();
463
464 let summary = validator
465 .validate_shape(&shape, &ctx, always_violated_evaluator)
466 .expect("validation should succeed");
467
468 assert!(summary.all_satisfied);
469 assert_eq!(summary.outcomes.len(), 0);
470 }
471
472 #[test]
475 fn test_validate_nodes_batch() {
476 let shape = make_shape_with_constraints(4);
477 let validator = ParallelConstraintValidator::with_defaults();
478
479 let focus_iris = ["http://ex/Alice", "http://ex/Bob", "http://ex/Carol"];
480
481 let contexts: Vec<_> = focus_iris
482 .iter()
483 .map(|iri| {
484 let focus = Term::NamedNode(NamedNode::new(*iri).expect("valid IRI"));
485 ConstraintContext::new(focus, ShapeId::new("http://ex/TestShape"))
486 })
487 .collect();
488
489 let summaries = validator
490 .validate_nodes(&shape, &contexts, always_satisfied_evaluator)
491 .expect("batch validation should succeed");
492
493 assert_eq!(summaries.len(), 3);
494 for s in &summaries {
495 assert!(s.all_satisfied);
496 }
497 }
498
499 #[test]
502 fn test_stats_merge() {
503 let shape = make_shape_with_constraints(5);
504 let ctx = dummy_context();
505 let validator = ParallelConstraintValidator::with_defaults();
506
507 let summary = validator
508 .validate_shape(&shape, &ctx, always_satisfied_evaluator)
509 .expect("validation should succeed");
510
511 let mut stats = ParallelValidationStats::default();
512 stats.merge(&summary);
513
514 assert_eq!(stats.shapes_validated, 1);
515 assert_eq!(stats.constraints_evaluated, 5);
516 assert_eq!(stats.total_violations, 0);
517 }
518
519 #[test]
522 fn test_evaluator_error_propagated() {
523 let shape = make_shape_with_constraints(4);
524 let ctx = dummy_context();
525 let validator = ParallelConstraintValidator::with_defaults();
526
527 let result = validator.validate_shape(&shape, &ctx, |_id, _c, _ctx| {
528 Err(ShaclError::ConstraintValidation(
529 "simulated error".to_string(),
530 ))
531 });
532
533 assert!(result.is_err());
534 }
535}
536
537#[cfg(test)]
542mod extended_parallel_tests {
543 use super::*;
544 use crate::constraints::{
545 cardinality_constraints::MinCountConstraint, constraint_context::ConstraintContext,
546 Constraint,
547 };
548 use crate::{ConstraintComponentId, Shape, ShapeId, ShapeType};
549 use oxirs_core::model::{NamedNode, Term};
550 use std::collections::HashMap;
551
552 fn make_shape(n: usize) -> Shape {
553 let mut shape = Shape::new(ShapeId::new("http://ex/TestShape"), ShapeType::NodeShape);
554 for i in 0..n {
555 let id = ConstraintComponentId::new(format!("sh:minCount_{i}"));
556 shape.constraints.insert(
557 id,
558 Constraint::MinCount(MinCountConstraint { min_count: 0 }),
559 );
560 }
561 shape
562 }
563
564 fn ctx_for(iri: &str) -> ConstraintContext {
565 let focus = Term::NamedNode(NamedNode::new(iri).expect("valid IRI"));
566 ConstraintContext::new(focus, ShapeId::new("http://ex/TestShape"))
567 }
568
569 fn always_ok(
570 _id: &ConstraintComponentId,
571 _c: &Constraint,
572 _ctx: &ConstraintContext,
573 ) -> Result<ConstraintEvaluationResult> {
574 Ok(ConstraintEvaluationResult::Satisfied)
575 }
576
577 fn always_fail(
578 _id: &ConstraintComponentId,
579 _c: &Constraint,
580 _ctx: &ConstraintContext,
581 ) -> Result<ConstraintEvaluationResult> {
582 Ok(ConstraintEvaluationResult::Violated {
583 violating_value: None,
584 message: Some("fail".to_string()),
585 details: HashMap::new(),
586 })
587 }
588
589 #[test]
592 fn test_default_config_threshold_is_four() {
593 let cfg = ParallelConstraintConfig::default();
594 assert_eq!(cfg.parallel_threshold, 4);
595 }
596
597 #[test]
598 fn test_default_config_fail_fast_is_false() {
599 let cfg = ParallelConstraintConfig::default();
600 assert!(!cfg.fail_fast);
601 }
602
603 #[test]
604 fn test_custom_config_stored() {
605 let cfg = ParallelConstraintConfig {
606 max_threads: 2,
607 parallel_threshold: 10,
608 fail_fast: true,
609 };
610 let validator = ParallelConstraintValidator::new(cfg);
611 assert_eq!(validator.config.parallel_threshold, 10);
612 assert!(validator.config.fail_fast);
613 }
614
615 #[test]
618 fn test_violations_iterator_empty_when_all_satisfied() {
619 let shape = make_shape(4);
620 let ctx = ctx_for("http://ex/Alice");
621 let v = ParallelConstraintValidator::with_defaults();
622 let summary = v.validate_shape(&shape, &ctx, always_ok).expect("ok");
623 assert_eq!(summary.violations().count(), 0);
624 }
625
626 #[test]
627 fn test_violations_iterator_matches_violation_count() {
628 let shape = make_shape(6);
629 let ctx = ctx_for("http://ex/Alice");
630 let v = ParallelConstraintValidator::with_defaults();
631 let summary = v.validate_shape(&shape, &ctx, always_fail).expect("ok");
632 let iter_count = summary.violations().count();
633 assert_eq!(iter_count, summary.violation_count());
634 }
635
636 #[test]
639 fn test_validate_nodes_empty_list() {
640 let shape = make_shape(4);
641 let v = ParallelConstraintValidator::with_defaults();
642 let summaries = v.validate_nodes(&shape, &[], always_ok).expect("ok");
643 assert!(summaries.is_empty());
644 }
645
646 #[test]
649 fn test_validate_nodes_partial_violations() {
650 let shape = make_shape(5);
651 let v = ParallelConstraintValidator::with_defaults();
652
653 let ctxs: Vec<_> = ["http://ex/A", "http://ex/B", "http://ex/C"]
654 .iter()
655 .map(|iri| ctx_for(iri))
656 .collect();
657
658 let summaries = v.validate_nodes(&shape, &ctxs, always_fail).expect("ok");
660 assert_eq!(summaries.len(), 3);
661 assert!(summaries.iter().all(|s| !s.all_satisfied));
662 }
663
664 #[test]
667 fn test_stats_merge_accumulates_shapes() {
668 let shape = make_shape(4);
669 let ctx1 = ctx_for("http://ex/A");
670 let ctx2 = ctx_for("http://ex/B");
671 let v = ParallelConstraintValidator::with_defaults();
672
673 let s1 = v.validate_shape(&shape, &ctx1, always_ok).expect("ok");
674 let s2 = v.validate_shape(&shape, &ctx2, always_ok).expect("ok");
675
676 let mut stats = ParallelValidationStats::default();
677 stats.merge(&s1);
678 stats.merge(&s2);
679
680 assert_eq!(stats.shapes_validated, 2);
681 assert_eq!(stats.constraints_evaluated, 8);
682 }
683
684 #[test]
685 fn test_stats_total_violations_accumulates() {
686 let shape = make_shape(3);
687 let ctx = ctx_for("http://ex/A");
688 let v = ParallelConstraintValidator::with_defaults();
689
690 let summary = v.validate_shape(&shape, &ctx, always_fail).expect("ok");
691
692 let mut stats = ParallelValidationStats::default();
693 stats.merge(&summary);
694
695 assert_eq!(stats.total_violations, 3);
696 }
697
698 #[test]
699 fn test_stats_merge_twice_doubles_counts() {
700 let shape = make_shape(4);
701 let ctx = ctx_for("http://ex/A");
702 let v = ParallelConstraintValidator::with_defaults();
703 let summary = v.validate_shape(&shape, &ctx, always_ok).expect("ok");
704
705 let mut stats = ParallelValidationStats::default();
706 stats.merge(&summary);
707 stats.merge(&summary);
708
709 assert_eq!(stats.shapes_validated, 2);
710 assert_eq!(stats.constraints_evaluated, 8);
711 }
712
713 #[test]
716 fn test_all_satisfied_true_when_no_violations() {
717 let shape = make_shape(8);
718 let ctx = ctx_for("http://ex/Alice");
719 let v = ParallelConstraintValidator::with_defaults();
720 let summary = v.validate_shape(&shape, &ctx, always_ok).expect("ok");
721 assert!(summary.all_satisfied);
722 }
723
724 #[test]
725 fn test_all_satisfied_false_when_violations_exist() {
726 let shape = make_shape(8);
727 let ctx = ctx_for("http://ex/Alice");
728 let v = ParallelConstraintValidator::with_defaults();
729 let summary = v.validate_shape(&shape, &ctx, always_fail).expect("ok");
730 assert!(!summary.all_satisfied);
731 }
732
733 #[test]
736 fn test_outcomes_length_matches_constraint_count() {
737 let n = 7;
738 let shape = make_shape(n);
739 let ctx = ctx_for("http://ex/X");
740 let v = ParallelConstraintValidator::with_defaults();
741 let summary = v.validate_shape(&shape, &ctx, always_ok).expect("ok");
742 assert_eq!(summary.outcomes.len(), n);
743 }
744
745 #[test]
748 fn test_shape_with_zero_constraints() {
749 let shape = make_shape(0);
750 let ctx = ctx_for("http://ex/Alice");
751 let v = ParallelConstraintValidator::with_defaults();
752 let summary = v.validate_shape(&shape, &ctx, always_ok).expect("ok");
753 assert!(summary.all_satisfied);
754 assert_eq!(summary.outcomes.len(), 0);
755 }
756
757 #[test]
760 fn test_above_threshold_uses_parallel_path() {
761 let shape = make_shape(20); let ctx = ctx_for("http://ex/Alice");
763 let v = ParallelConstraintValidator::with_defaults();
764 let summary = v.validate_shape(&shape, &ctx, always_ok).expect("ok");
765 assert!(
767 summary.parallel_count > 0,
768 "expected parallel evaluation for 20 constraints"
769 );
770 }
771}