1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use gaze_types::{
5 EmittedTokenSpan, LeakReport, LeakReportTelemetry, LeakSuspect, Manifest, RedactionLogError,
6 RedactionLogger, SafetyNet, SafetyNetContext, SafetyNetError,
7};
8use thiserror::Error;
9
10use crate::detector::{Detection, Detector, PiiClass};
11use crate::normalize::normalize;
12use crate::policy::PolicyError;
13use crate::redaction_log::{ConflictTier, DocumentKind, RedactionEntry};
14use crate::registry::{Candidate, DetectContext, Recognizer, RecognizerRegistry};
15use crate::rule::{Action, Rule, RuleContext};
16use crate::rulepack::RulepackError;
17use crate::session::Session;
18use crate::types::{CleanDocument, RawDocument, Value};
19use crate::DictionaryBundle;
20
21pub type Result<T> = std::result::Result<T, Error>;
22
23#[derive(Debug, Error)]
24#[non_exhaustive]
25pub enum Error {
26 #[error("invalid regex: {0}")]
27 InvalidRegex(#[source] regex::Error),
28 #[error("unknown token: {0}")]
29 UnknownToken(String),
30 #[error("ephemeral sessions cannot be exported")]
31 ExportForbidden,
32 #[error("invalid snapshot version: {0}")]
33 InvalidSnapshotVersion(u8),
34 #[error("snapshot signature verification failed")]
35 InvalidSnapshotSignature,
36 #[error("snapshot expired: issued_at={issued_at}, ttl_secs={ttl_secs}")]
37 BlobExpired { issued_at: u64, ttl_secs: u64 },
38 #[error("snapshot decode failed: {0}")]
39 SnapshotDecode(#[source] serde_json::Error),
40 #[error("invalid snapshot payload")]
41 InvalidSnapshotPayload,
42 #[error("sqlite error: {0}")]
43 Sqlite(String),
44 #[error("policy error: {0}")]
45 Policy(#[from] PolicyError),
46 #[error("rulepack error: {0}")]
47 Rulepack(#[from] RulepackError),
48 #[error("safety net error: {0}")]
49 SafetyNet(#[from] SafetyNetError),
50 #[error("redaction log error: {0}")]
51 RedactionLog(#[from] RedactionLogError),
52 #[error("unsupported raw document variant")]
53 UnsupportedRawDocumentVariant,
54 #[error("unsupported structured value variant")]
55 UnsupportedValueVariant,
56 #[error("unsupported policy action variant")]
57 UnsupportedActionVariant,
58}
59
60#[derive(Clone)]
92pub struct Pipeline {
93 registry: Arc<RecognizerRegistry>,
94 redaction_loggers: Vec<Arc<dyn RedactionLogger>>,
95 safety_nets: Vec<Arc<dyn SafetyNet>>,
96 rules: Vec<Arc<dyn Rule>>,
97}
98
99impl std::fmt::Debug for Pipeline {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 f.debug_struct("Pipeline").finish_non_exhaustive()
102 }
103}
104
105impl Pipeline {
106 pub fn builder() -> PipelineBuilder {
107 PipelineBuilder::default()
108 }
109
110 pub fn with_redaction_logger<L>(mut self, logger: L) -> Pipeline
111 where
112 L: RedactionLogger + 'static,
113 {
114 self.redaction_loggers.push(Arc::new(logger));
115 self
116 }
117
118 pub fn with_safety_net<N>(mut self, safety_net: N) -> Pipeline
119 where
120 N: SafetyNet + 'static,
121 {
122 self.safety_nets.push(Arc::new(safety_net));
123 self
124 }
125
126 pub fn redact(&self, session: &Session, raw: RawDocument) -> Result<CleanDocument> {
131 let locale_chain = [crate::LocaleTag::Global];
132 self.redact_with_context(session, raw, &locale_chain)
133 }
134
135 pub fn redact_with_context(
136 &self,
137 session: &Session,
138 raw: RawDocument,
139 locale_chain: &[crate::LocaleTag],
140 ) -> Result<CleanDocument> {
141 let dictionaries = DictionaryBundle::default();
142 self.redact_with_detect_context(session, raw, locale_chain, &dictionaries)
143 }
144
145 pub fn redact_with_detect_context(
146 &self,
147 session: &Session,
148 raw: RawDocument,
149 locale_chain: &[crate::LocaleTag],
150 dictionaries: &DictionaryBundle,
151 ) -> Result<CleanDocument> {
152 match raw {
153 RawDocument::Structured(structured_fields) => redact_structured(
154 self,
155 session,
156 structured_fields,
157 DocumentKind::Structured,
158 locale_chain,
159 dictionaries,
160 ),
161 RawDocument::Text(text) => Ok(CleanDocument::Text(self.redact_text(
162 session,
163 &text,
164 None,
165 DocumentKind::Text,
166 locale_chain,
167 dictionaries,
168 )?)),
169 _ => Err(Error::UnsupportedRawDocumentVariant),
170 }
171 }
172
173 pub fn clean_with_safety_net(
174 &self,
175 session: &Session,
176 raw: RawDocument,
177 locale_chain: &[crate::LocaleTag],
178 ) -> Result<(CleanDocument, Vec<EmittedTokenSpan>, LeakReport)> {
179 let dictionaries = DictionaryBundle::default();
180 self.clean_with_safety_net_detect_context(session, raw, locale_chain, &dictionaries)
181 }
182
183 pub fn clean_with_safety_net_detect_context(
184 &self,
185 session: &Session,
186 raw: RawDocument,
187 locale_chain: &[crate::LocaleTag],
188 dictionaries: &DictionaryBundle,
189 ) -> Result<(CleanDocument, Vec<EmittedTokenSpan>, LeakReport)> {
190 match raw {
191 RawDocument::Structured(structured_fields) => {
192 let mut report = LeakReport::default();
193 let clean = redact_structured_with_safety_net(
194 self,
195 session,
196 structured_fields,
197 locale_chain,
198 dictionaries,
199 &mut report,
200 )?;
201 Ok((CleanDocument::Structured(clean), Vec::new(), report))
202 }
203 RawDocument::Text(text) => {
204 let clean = self.redact_text_with_manifest(
205 session,
206 &text,
207 None,
208 DocumentKind::Text,
209 locale_chain,
210 dictionaries,
211 )?;
212 let report = self.run_safety_nets(
213 session,
214 &clean.text,
215 &Manifest::from_spans(clean.manifest.clone()),
216 DocumentKind::Text,
217 locale_chain,
218 None,
219 )?;
220 Ok((CleanDocument::Text(clean.text), clean.manifest, report))
221 }
222 _ => Err(Error::UnsupportedRawDocumentVariant),
223 }
224 }
225
226 #[allow(clippy::too_many_arguments)]
227 fn redact_text(
228 &self,
229 session: &Session,
230 text: &str,
231 field_name: Option<&str>,
232 document_kind: DocumentKind,
233 locale_chain: &[crate::LocaleTag],
234 dictionaries: &DictionaryBundle,
235 ) -> Result<String> {
236 Ok(self
237 .redact_text_with_manifest(
238 session,
239 text,
240 field_name,
241 document_kind,
242 locale_chain,
243 dictionaries,
244 )?
245 .text)
246 }
247
248 #[allow(clippy::too_many_arguments)]
249 fn redact_text_with_manifest(
250 &self,
251 session: &Session,
252 text: &str,
253 field_name: Option<&str>,
254 document_kind: DocumentKind,
255 locale_chain: &[crate::LocaleTag],
256 dictionaries: &DictionaryBundle,
257 ) -> Result<CleanText> {
258 let normalized = normalize(text);
259 let spans = &normalized.spans;
260 let ctx = DetectContext::new(locale_chain, dictionaries);
261 let resolved = self
262 .registry
263 .detect_all_resolved(&normalized.text, &ctx)
264 .into_iter()
265 .filter_map(|candidate| translate_candidate(candidate, spans))
266 .collect::<Vec<_>>();
267 let losers = merged_losers(&resolved);
268 let mut detections = resolved
269 .into_iter()
270 .map(IndexedDetection::from)
271 .collect::<Vec<_>>();
272 for loser in &losers {
273 self.log_entry(
274 session,
275 loser,
276 field_name,
277 document_kind,
278 self.action_for(&loser.detection, &build_context(field_name)),
279 true,
280 )?;
281 }
282
283 detections.sort_by_key(|d| d.detection.span.start);
284 let mut out = String::with_capacity(text.len());
285 let mut emitted = Vec::with_capacity(detections.len());
286 let mut cursor = 0usize;
287
288 for detection in detections {
289 let raw = text[detection.detection.span.clone()].to_string();
290 let context = build_context(field_name);
291 let action = self.action_for(&detection.detection, &context);
292 self.log_entry(
293 session,
294 &detection,
295 field_name,
296 document_kind,
297 action,
298 false,
299 )?;
300
301 let replacement = match action {
302 Action::Tokenize => Some(session.tokenize_with_family(
303 &detection.family,
304 &detection.detection.class,
305 &raw,
306 )?),
307 Action::Redact => Some("[REDACTED]".to_string()),
308 Action::FormatPreserve => {
309 Some(session.format_preserving_fake(&detection.detection.class, &raw)?)
310 }
311 Action::Generalize => Some(generalize_token(&detection.detection.class)),
312 Action::Preserve => None,
313 _ => return Err(Error::UnsupportedActionVariant),
314 };
315
316 let span = detection.detection.span;
317 if span.start > cursor {
318 out.push_str(&text[cursor..span.start]);
319 }
320 match replacement {
321 Some(replacement) => {
322 let clean_start = out.len();
323 out.push_str(&replacement);
324 emitted.push(EmittedTokenSpan::new(
325 clean_start..out.len(),
326 span.clone(),
327 detection.detection.class,
328 ));
329 }
330 None => out.push_str(&text[span.clone()]),
331 }
332 cursor = span.end;
333 }
334
335 if cursor < text.len() {
336 out.push_str(&text[cursor..]);
337 }
338
339 Ok(CleanText {
340 text: out,
341 manifest: emitted,
342 })
343 }
344
345 fn run_safety_nets(
346 &self,
347 session: &Session,
348 clean_text: &str,
349 manifest: &Manifest,
350 document_kind: DocumentKind,
351 locale_chain: &[crate::LocaleTag],
352 field_path: Option<&str>,
353 ) -> Result<LeakReport> {
354 if self.safety_nets.is_empty() {
355 return Ok(LeakReport::default());
356 }
357
358 let mut suspects = Vec::<LeakSuspect>::new();
359 let mut telemetry = Vec::new();
360 let active = gaze_types::LocaleChain::from(locale_chain);
361 for net in &self.safety_nets {
362 if !active.intersects(net.supported_locales()) {
363 telemetry.push(LeakReportTelemetry::LocaleSkipped {
364 safety_net_id: net.id().to_string(),
365 document_kind,
366 field_path: field_path.map(str::to_string),
367 });
368 continue;
369 }
370
371 let context = SafetyNetContext::new(
372 manifest,
373 locale_chain,
374 document_kind,
375 Some(session.audit_session_id()),
376 field_path,
377 );
378 let mut reported = net.check(clean_text, context)?;
379 if let Some(path) = field_path {
380 for suspect in &mut reported {
381 if suspect.field_path.is_none() {
382 suspect.field_path = Some(path.to_string());
383 }
384 }
385 }
386 suspects.extend(reported);
387 }
388
389 Ok(LeakReport::from_parts(suspects, telemetry))
390 }
391
392 fn action_for(&self, detection: &Detection, context: &RuleContext) -> Action {
393 self.rules
394 .iter()
395 .find_map(|rule| rule.action(&detection.class, context))
396 .unwrap_or(Action::Preserve)
397 }
398
399 fn log_entry(
400 &self,
401 session: &Session,
402 detection: &IndexedDetection,
403 field_name: Option<&str>,
404 document_kind: DocumentKind,
405 action: Action,
406 conflict_loser: bool,
407 ) -> Result<()> {
408 let entry = RedactionEntry::new(
409 detection.detection.source.clone(),
410 detection.detection.class.clone(),
411 action,
412 field_name.map(str::to_string),
413 document_kind,
414 conflict_loser,
415 detection.decided_by,
416 crate::redaction_log::current_epoch_ms(),
417 Some(session.audit_session_id().to_string()),
418 );
419
420 for logger in &self.redaction_loggers {
421 logger.log(&entry)?;
422 }
423
424 Ok(())
425 }
426}
427
428#[derive(Clone)]
429struct IndexedDetection {
430 detection: Detection,
431 decided_by: ConflictTier,
432 family: String,
433}
434
435struct CleanText {
436 text: String,
437 manifest: Vec<EmittedTokenSpan>,
438}
439
440#[derive(Default)]
448pub struct PipelineBuilder {
449 recognizers: Vec<Arc<dyn Recognizer>>,
450 redaction_loggers: Vec<Arc<dyn RedactionLogger>>,
451 safety_nets: Vec<Arc<dyn SafetyNet>>,
452 rules: Vec<Arc<dyn Rule>>,
453}
454
455impl PipelineBuilder {
456 pub fn detector<D>(mut self, detector: D) -> Self
457 where
458 D: Detector + 'static,
459 {
460 self.recognizers
461 .push(Arc::new(DetectorRecognizer::new(detector)));
462 self
463 }
464
465 pub fn recognizer<R>(mut self, recognizer: R) -> Self
466 where
467 R: Recognizer + 'static,
468 {
469 self.recognizers.push(Arc::new(recognizer));
470 self
471 }
472
473 pub fn rule<R>(mut self, rule: R) -> Self
474 where
475 R: Rule + 'static,
476 {
477 self.rules.push(Arc::new(rule));
478 self
479 }
480
481 pub fn redaction_logger<L>(mut self, logger: L) -> Self
482 where
483 L: RedactionLogger + 'static,
484 {
485 self.redaction_loggers.push(Arc::new(logger));
486 self
487 }
488
489 pub fn register_safety_net<N>(mut self, safety_net: N) -> Self
490 where
491 N: SafetyNet + 'static,
492 {
493 self.safety_nets.push(Arc::new(safety_net));
494 self
495 }
496
497 pub fn build(self) -> Result<Pipeline> {
498 let mut registry = RecognizerRegistry::builder();
499 for recognizer in self.recognizers {
500 registry = registry.register_arc(recognizer);
501 }
502 Ok(Pipeline {
503 registry: Arc::new(registry.build()),
504 redaction_loggers: self.redaction_loggers,
505 safety_nets: self.safety_nets,
506 rules: self.rules,
507 })
508 }
509}
510
511fn redact_structured(
512 pipeline: &Pipeline,
513 session: &Session,
514 fields: BTreeMap<String, Value>,
515 document_kind: DocumentKind,
516 locale_chain: &[crate::LocaleTag],
517 dictionaries: &DictionaryBundle,
518) -> Result<CleanDocument> {
519 let mut clean = BTreeMap::new();
520 for (key, value) in fields {
521 let path = format!("$.{key}");
522 clean.insert(
523 key.clone(),
524 redact_structured_value(
525 pipeline,
526 session,
527 value,
528 &key,
529 &path,
530 document_kind,
531 locale_chain,
532 dictionaries,
533 )?,
534 );
535 }
536 Ok(CleanDocument::Structured(clean))
537}
538
539#[allow(clippy::too_many_arguments)]
540fn redact_structured_value(
541 pipeline: &Pipeline,
542 session: &Session,
543 value: Value,
544 field_name: &str,
545 field_path: &str,
546 document_kind: DocumentKind,
547 locale_chain: &[crate::LocaleTag],
548 dictionaries: &DictionaryBundle,
549) -> Result<Value> {
550 match value {
551 Value::String(text) => Ok(Value::String(pipeline.redact_text(
552 session,
553 &text,
554 Some(field_name),
555 document_kind,
556 locale_chain,
557 dictionaries,
558 )?)),
559 Value::Array(values) => values
560 .into_iter()
561 .enumerate()
562 .map(|(idx, value)| {
563 redact_structured_value(
564 pipeline,
565 session,
566 value,
567 field_name,
568 &format!("{field_path}[{idx}]"),
569 document_kind,
570 locale_chain,
571 dictionaries,
572 )
573 })
574 .collect::<Result<Vec<_>>>()
575 .map(Value::Array),
576 Value::Object(fields) => {
577 let mut clean = BTreeMap::new();
578 for (key, value) in fields {
579 let child_path = format!("{field_path}.{key}");
580 clean.insert(
581 key.clone(),
582 redact_structured_value(
583 pipeline,
584 session,
585 value,
586 &key,
587 &child_path,
588 document_kind,
589 locale_chain,
590 dictionaries,
591 )?,
592 );
593 }
594 Ok(Value::Object(clean))
595 }
596 Value::Null | Value::Bool(_) | Value::I64(_) => Ok(value),
597 _ => Err(Error::UnsupportedValueVariant),
598 }
599}
600
601#[allow(clippy::too_many_arguments)]
602fn redact_structured_with_safety_net(
603 pipeline: &Pipeline,
604 session: &Session,
605 fields: BTreeMap<String, Value>,
606 locale_chain: &[crate::LocaleTag],
607 dictionaries: &DictionaryBundle,
608 report: &mut LeakReport,
609) -> Result<BTreeMap<String, Value>> {
610 let mut clean = BTreeMap::new();
611 for (key, value) in fields {
612 let path = format!("$.{key}");
613 clean.insert(
614 key.clone(),
615 redact_structured_value_with_safety_net(
616 pipeline,
617 session,
618 value,
619 &key,
620 &path,
621 locale_chain,
622 dictionaries,
623 report,
624 )?,
625 );
626 }
627 Ok(clean)
628}
629
630#[allow(clippy::too_many_arguments)]
631fn redact_structured_value_with_safety_net(
632 pipeline: &Pipeline,
633 session: &Session,
634 value: Value,
635 field_name: &str,
636 field_path: &str,
637 locale_chain: &[crate::LocaleTag],
638 dictionaries: &DictionaryBundle,
639 report: &mut LeakReport,
640) -> Result<Value> {
641 match value {
642 Value::String(text) => {
643 if text.is_empty() {
644 return Ok(Value::String(text));
645 }
646 let clean = pipeline.redact_text_with_manifest(
647 session,
648 &text,
649 Some(field_name),
650 DocumentKind::Structured,
651 locale_chain,
652 dictionaries,
653 )?;
654 let field_report = pipeline.run_safety_nets(
657 session,
658 &clean.text,
659 &Manifest::from_spans(clean.manifest),
660 DocumentKind::Structured,
661 locale_chain,
662 Some(field_path),
663 )?;
664 report.extend(field_report);
665 Ok(Value::String(clean.text))
666 }
667 Value::Array(values) => values
668 .into_iter()
669 .enumerate()
670 .map(|(idx, value)| {
671 redact_structured_value_with_safety_net(
672 pipeline,
673 session,
674 value,
675 field_name,
676 &format!("{field_path}[{idx}]"),
677 locale_chain,
678 dictionaries,
679 report,
680 )
681 })
682 .collect::<Result<Vec<_>>>()
683 .map(Value::Array),
684 Value::Object(fields) => {
685 let mut clean = BTreeMap::new();
686 for (key, value) in fields {
687 let child_path = format!("{field_path}.{key}");
688 clean.insert(
689 key.clone(),
690 redact_structured_value_with_safety_net(
691 pipeline,
692 session,
693 value,
694 &key,
695 &child_path,
696 locale_chain,
697 dictionaries,
698 report,
699 )?,
700 );
701 }
702 Ok(Value::Object(clean))
703 }
704 Value::Null | Value::Bool(_) | Value::I64(_) => {
705 if let Some(scalar) = value.scalar_to_safety_net_string() {
706 let field_report = pipeline.run_safety_nets(
707 session,
708 &scalar,
709 &Manifest::default(),
710 DocumentKind::Structured,
711 locale_chain,
712 Some(field_path),
713 )?;
714 report.extend(field_report);
715 }
716 Ok(value)
717 }
718 _ => Err(Error::UnsupportedValueVariant),
719 }
720}
721
722fn translate_candidate(candidate: Candidate, spans: &[(usize, usize)]) -> Option<Candidate> {
723 translate_span(candidate.span.clone(), spans).map(|span| candidate.with_span(span))
724}
725
726fn translate_span(
727 span: std::ops::Range<usize>,
728 spans: &[(usize, usize)],
729) -> Option<std::ops::Range<usize>> {
730 if span.is_empty() || span.end > spans.len() {
731 return None;
732 }
733
734 let start = spans[span.start].0;
735 let end = spans[span.end - 1].1;
736 Some(start..end)
737}
738
739fn merged_losers(resolved: &[Candidate]) -> Vec<IndexedDetection> {
740 resolved
741 .iter()
742 .flat_map(|winner| {
743 winner.merged_sources.iter().map(|source| IndexedDetection {
744 detection: Detection::new(
745 winner.span.clone(),
746 winner.class.clone(),
747 source.clone(),
748 ),
749 decided_by: if winner.decided_by == ConflictTier::Merged {
750 ConflictTier::Merged
751 } else {
752 winner.decided_by
753 },
754 family: winner.token_family.clone(),
755 })
756 })
757 .collect()
758}
759
760impl From<Candidate> for IndexedDetection {
761 fn from(candidate: Candidate) -> Self {
762 Self {
763 detection: Detection::new(candidate.span, candidate.class, candidate.source),
764 decided_by: candidate.decided_by,
765 family: candidate.token_family,
766 }
767 }
768}
769
770struct DetectorRecognizer<D> {
771 detector: D,
772 class: crate::PiiClass,
773}
774
775impl<D> DetectorRecognizer<D> {
776 fn new(detector: D) -> Self {
777 Self {
778 detector,
779 class: crate::PiiClass::Custom("__legacy_detector__".to_string()),
780 }
781 }
782}
783
784impl<D> Recognizer for DetectorRecognizer<D>
785where
786 D: Detector + Send + Sync + 'static,
787{
788 fn id(&self) -> &str {
789 "legacy-detector"
790 }
791
792 fn supported_class(&self) -> &crate::PiiClass {
793 &self.class
794 }
795
796 fn detect(&self, input: &str, _ctx: &DetectContext<'_>) -> Vec<Candidate> {
797 self.detector
798 .detect(input)
799 .into_iter()
800 .map(|detection| {
801 let source = detection.source;
802 Candidate::new(
803 detection.span,
804 detection.class,
805 source.clone(),
806 1.0,
807 0,
808 None,
809 "counter",
810 source,
811 ConflictTier::None,
812 Vec::new(),
813 )
814 })
815 .collect()
816 }
817
818 fn token_family(&self) -> &str {
819 "counter"
820 }
821}
822
823fn generalize_token(class: &PiiClass) -> String {
824 match class {
825 PiiClass::Email => "[EMAIL]".to_string(),
826 PiiClass::Name => "[NAME]".to_string(),
827 PiiClass::Location => "[LOCATION]".to_string(),
828 PiiClass::Organization => "[ORGANIZATION]".to_string(),
829 PiiClass::Custom(name) => format!("[{}]", name.to_ascii_uppercase()),
830 }
831}
832
833fn build_context(field_name: Option<&str>) -> RuleContext {
834 RuleContext {
835 field_name: field_name.map(str::to_string),
836 }
837}
838
839#[cfg(test)]
840mod tests {
841 use super::*;
842 use crate::detector::{Detection, PiiClass};
843 use crate::rule::{ClassRule, DefaultRule};
844 use crate::session::{Scope, Session};
845 use std::sync::Mutex;
846
847 struct CapturingLogger {
851 entries: Arc<Mutex<Vec<RedactionEntry>>>,
852 }
853
854 struct FixedDetector {
855 detections: Vec<Detection>,
856 }
857
858 impl Detector for FixedDetector {
859 fn detect(&self, _input: &str) -> Vec<Detection> {
860 self.detections.clone()
861 }
862 }
863
864 fn detector_with_detections(source: &str, detections: Vec<Detection>) -> FixedDetector {
865 FixedDetector {
866 detections: detections
867 .into_iter()
868 .map(|mut detection| {
869 detection.source = source.to_string();
870 detection
871 })
872 .collect(),
873 }
874 }
875
876 impl RedactionLogger for CapturingLogger {
877 fn log(&self, entry: &RedactionEntry) -> std::result::Result<(), RedactionLogError> {
878 self.entries.lock().unwrap().push(entry.clone());
879 Ok(())
880 }
881 }
882
883 #[test]
884 fn generalize_token_custom_class_preserves_identity() {
885 assert_eq!(generalize_token(&PiiClass::Custom("foo".into())), "[FOO]");
887 }
888
889 #[test]
890 fn stacked_ner_detectors_resolve_via_span_conflict() {
891 let text = "Alice Smith works here";
893 let short_detection = Detection::new(0..5, PiiClass::Name, "ner/bert");
894 let long_detection = Detection::new(0..11, PiiClass::Name, "ner/gliner");
895
896 let bert = detector_with_detections("ner/bert", vec![short_detection]);
897 let gliner = detector_with_detections("ner/gliner", vec![long_detection]);
898
899 let entries = Arc::new(Mutex::new(Vec::<RedactionEntry>::new()));
900
901 let pipeline = Pipeline::builder()
902 .detector(bert)
903 .detector(gliner)
904 .rule(ClassRule::new(PiiClass::Name, Action::Redact))
905 .rule(DefaultRule::new(Action::Preserve))
906 .redaction_logger(CapturingLogger {
907 entries: Arc::clone(&entries),
908 })
909 .build()
910 .expect("pipeline");
911
912 let session = Session::new(Scope::Ephemeral).expect("session");
913 let clean = pipeline
914 .redact(&session, RawDocument::Text(text.to_string()))
915 .expect("redact");
916
917 let out = match clean {
918 CleanDocument::Text(t) => t,
919 _ => panic!("expected text"),
920 };
921
922 assert_eq!(out, "[REDACTED] works here");
924
925 let entries = entries.lock().unwrap();
926 assert_eq!(
927 entries.len(),
928 2,
929 "expected one winner + one loser: {entries:?}"
930 );
931 let winner = entries.iter().find(|e| !e.conflict_loser).expect("winner");
932 let loser = entries.iter().find(|e| e.conflict_loser).expect("loser");
933 assert_eq!(winner.source, "ner/gliner", "longer span should win");
934 assert_eq!(loser.source, "ner/bert", "shorter span should lose");
935 assert_eq!(loser.decided_by, ConflictTier::SpanLength);
936 }
937
938 #[test]
939 fn stacked_detectors_both_win_when_spans_disjoint() {
940 let text = "Alice visited Berlin";
941 let alice = Detection::new(0..5, PiiClass::Name, "ner/bert");
942 let berlin = Detection::new(14..20, PiiClass::Location, "ner/gliner");
943
944 let bert = detector_with_detections("ner/bert", vec![alice]);
945 let gliner = detector_with_detections("ner/gliner", vec![berlin]);
946
947 let entries = Arc::new(Mutex::new(Vec::<RedactionEntry>::new()));
948
949 let pipeline = Pipeline::builder()
950 .detector(bert)
951 .detector(gliner)
952 .rule(ClassRule::new(PiiClass::Name, Action::Redact))
953 .rule(ClassRule::new(PiiClass::Location, Action::Redact))
954 .rule(DefaultRule::new(Action::Preserve))
955 .redaction_logger(CapturingLogger {
956 entries: Arc::clone(&entries),
957 })
958 .build()
959 .expect("pipeline");
960
961 let session = Session::new(Scope::Ephemeral).expect("session");
962 let clean = pipeline
963 .redact(&session, RawDocument::Text(text.to_string()))
964 .expect("redact");
965
966 let out = match clean {
967 CleanDocument::Text(t) => t,
968 _ => panic!("expected text"),
969 };
970
971 assert_eq!(out, "[REDACTED] visited [REDACTED]");
972 let entries = entries.lock().unwrap();
973 assert_eq!(entries.len(), 2);
974 assert!(entries.iter().all(|e| !e.conflict_loser));
975 }
976
977 #[test]
978 fn pipeline_builder_detects_email() {
979 struct EmailDetector(regex::Regex);
980
981 impl Detector for EmailDetector {
982 fn detect(&self, input: &str) -> Vec<Detection> {
983 self.0
984 .find_iter(input)
985 .map(|m| Detection::new(m.range(), PiiClass::Email, "regex"))
986 .collect()
987 }
988 }
989
990 let pipeline = Pipeline::builder()
991 .detector(EmailDetector(
992 regex::Regex::new(r"(?i)\b[a-z0-9._%+\-]+@[a-z0-9.\-]+\.[a-z]{2,}\b").unwrap(),
993 ))
994 .rule(ClassRule::new(PiiClass::Email, Action::Tokenize))
995 .rule(DefaultRule::new(Action::Preserve))
996 .build()
997 .unwrap();
998 let session = Session::new(Scope::Ephemeral).unwrap();
999
1000 let clean = pipeline
1001 .redact(
1002 &session,
1003 RawDocument::Text("Reach alice@example.com today".to_string()),
1004 )
1005 .unwrap();
1006
1007 match clean {
1008 CleanDocument::Text(text) => {
1009 assert!(text.starts_with("Reach <"));
1010 assert!(text.ends_with(":Email_1> today"));
1011 }
1012 other => panic!("expected text output, got {other:?}"),
1013 }
1014 }
1015
1016 #[test]
1017 fn t21d_token_family_threads_from_recognizer_to_session() {
1018 struct FamilyRecognizer;
1019
1020 impl Recognizer for FamilyRecognizer {
1021 fn id(&self) -> &str {
1022 "name.alpha"
1023 }
1024
1025 fn supported_class(&self) -> &PiiClass {
1026 &PiiClass::Name
1027 }
1028
1029 fn detect(&self, input: &str, _ctx: &DetectContext<'_>) -> Vec<Candidate> {
1030 let Some(start) = input.find("Dr. Schmidt") else {
1031 return Vec::new();
1032 };
1033 let end = start + "Dr. Schmidt".len();
1034 vec![Candidate::new(
1035 start..end,
1036 PiiClass::Name,
1037 self.id(),
1038 1.0,
1039 0,
1040 None,
1041 self.token_family(),
1042 self.id(),
1043 ConflictTier::None,
1044 Vec::new(),
1045 )]
1046 }
1047
1048 fn token_family(&self) -> &str {
1049 "alpha"
1050 }
1051 }
1052
1053 let pipeline = Pipeline::builder()
1054 .recognizer(FamilyRecognizer)
1055 .rule(ClassRule::new(PiiClass::Name, Action::Tokenize))
1056 .rule(DefaultRule::new(Action::Preserve))
1057 .build()
1058 .expect("pipeline");
1059 let session = Session::new(Scope::Ephemeral).expect("session");
1060
1061 let clean = pipeline
1062 .redact(
1063 &session,
1064 RawDocument::Text("Assigned to Dr. Schmidt".to_string()),
1065 )
1066 .expect("redact");
1067 let CleanDocument::Text(text) = clean else {
1068 panic!("expected text");
1069 };
1070 let token = text
1071 .strip_prefix("Assigned to ")
1072 .expect("token prefix")
1073 .to_string();
1074 assert!(regex::Regex::new(r"^<[0-9a-f]{8}:Name_\d+>$")
1075 .unwrap()
1076 .is_match(&token));
1077
1078 let beta = session
1079 .tokenize_with_family("beta", &PiiClass::Name, "Dr. Schmidt")
1080 .expect("beta token");
1081 assert_ne!(token, beta);
1082 assert_eq!(
1083 session
1084 .tokenize_with_family("alpha", &PiiClass::Name, "Dr. Schmidt")
1085 .expect("alpha token"),
1086 token
1087 );
1088 assert_eq!(session.restore(&token).as_deref(), Some("Dr. Schmidt"));
1089 assert_eq!(session.restore(&beta).as_deref(), Some("Dr. Schmidt"));
1090 }
1091}