Skip to main content

gaze/
pipeline.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use gaze_types::{
5    EmittedTokenSpan, LeakReport, LeakReportTelemetry, LeakSuspect, Manifest, RedactionLogError,
6    RedactionLogger, SafetyNet, SafetyNetContext, SafetyNetError,
7};
8use thiserror::Error;
9
10use crate::detector::{Detection, Detector, PiiClass};
11use crate::normalize::normalize;
12use crate::policy::PolicyError;
13use crate::redaction_log::{ConflictTier, DocumentKind, RedactionEntry};
14use crate::registry::{Candidate, DetectContext, Recognizer, RecognizerRegistry};
15use crate::rule::{Action, Rule, RuleContext};
16use crate::rulepack::RulepackError;
17use crate::session::Session;
18use crate::types::{CleanDocument, RawDocument, Value};
19use crate::DictionaryBundle;
20
21pub type Result<T> = std::result::Result<T, Error>;
22
23#[derive(Debug, Error)]
24#[non_exhaustive]
25pub enum Error {
26    #[error("invalid regex: {0}")]
27    InvalidRegex(#[source] regex::Error),
28    #[error("unknown token: {0}")]
29    UnknownToken(String),
30    #[error("ephemeral sessions cannot be exported")]
31    ExportForbidden,
32    #[error("document extension integrity fields cannot be empty")]
33    EmptyDocumentIntegrity,
34    #[error("invalid snapshot version: {0}")]
35    InvalidSnapshotVersion(u8),
36    #[error("snapshot signature verification failed")]
37    InvalidSnapshotSignature,
38    #[error("snapshot expired: issued_at={issued_at}, ttl_secs={ttl_secs}")]
39    BlobExpired { issued_at: u64, ttl_secs: u64 },
40    #[error("snapshot decode failed: {0}")]
41    SnapshotDecode(#[source] serde_json::Error),
42    #[error("invalid snapshot payload")]
43    InvalidSnapshotPayload,
44    #[error("sqlite error: {0}")]
45    Sqlite(String),
46    #[error("policy error: {0}")]
47    Policy(#[from] PolicyError),
48    #[error("rulepack error: {0}")]
49    Rulepack(#[from] RulepackError),
50    #[error("safety net error: {0}")]
51    SafetyNet(#[from] SafetyNetError),
52    #[error("redaction log error: {0}")]
53    RedactionLog(#[from] RedactionLogError),
54    #[error("unsupported raw document variant")]
55    UnsupportedRawDocumentVariant,
56    #[error("unsupported structured value variant")]
57    UnsupportedValueVariant,
58    #[error("unsupported policy action variant")]
59    UnsupportedActionVariant,
60}
61
62/// The stateless PII pseudonymization engine.
63///
64/// `Pipeline` owns the recognizer registry, rule set, locale resolver, and optional audit logger.
65/// Construct once per process and share across requests; create one [`Session`] per conversation
66/// or request.
67///
68/// # Fail-closed
69///
70/// Construction fails if a recognizer fails to initialize, a validator name is unknown, or a policy
71/// cannot be parsed. There is no silent degradation to a weaker detection posture.
72///
73/// # Thread safety
74///
75/// `Pipeline` is `Send + Sync`. Share across threads; create one [`Session`] per request.
76///
77/// # Quick example
78///
79/// ```rust,no_run
80/// use gaze::{CleanDocument, Pipeline, RawDocument, Scope, Session};
81///
82/// let pipeline = Pipeline::builder().build()?;
83/// let session = Session::new(Scope::Ephemeral)?;
84/// let CleanDocument::Text(clean) = pipeline.redact(
85///     &session,
86///     RawDocument::Text("test".into()),
87/// )? else {
88///     panic!("expected text variant");
89/// };
90/// # let _ = clean;
91/// # Ok::<(), Box<dyn std::error::Error>>(())
92/// ```
93#[derive(Clone)]
94pub struct Pipeline {
95    registry: Arc<RecognizerRegistry>,
96    redaction_loggers: Vec<Arc<dyn RedactionLogger>>,
97    safety_nets: Vec<Arc<dyn SafetyNet>>,
98    rules: Vec<Arc<dyn Rule>>,
99}
100
101/// Observer-only safety-net scan result.
102#[derive(Debug, Clone, Default)]
103#[non_exhaustive]
104pub struct SafetyNetResult {
105    /// Number of safety nets registered for this pipeline.
106    pub nets_run: usize,
107    /// Metadata-only leak report aggregated across scanned leaves.
108    pub report: LeakReport,
109}
110
111impl std::fmt::Debug for Pipeline {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("Pipeline").finish_non_exhaustive()
114    }
115}
116
117impl Pipeline {
118    pub fn builder() -> PipelineBuilder {
119        PipelineBuilder::default()
120    }
121
122    pub fn with_redaction_logger<L>(mut self, logger: L) -> Pipeline
123    where
124        L: RedactionLogger + 'static,
125    {
126        self.redaction_loggers.push(Arc::new(logger));
127        self
128    }
129
130    pub fn with_safety_net<N>(mut self, safety_net: N) -> Pipeline
131    where
132        N: SafetyNet + 'static,
133    {
134        self.safety_nets.push(Arc::new(safety_net));
135        self
136    }
137
138    /// Redacts using the default `[Global]` locale chain.
139    ///
140    /// Prefer `redact_with_context` when policy, CLI, or rulepack locale
141    /// defaults should constrain which recognizers run.
142    pub fn redact(&self, session: &Session, raw: RawDocument) -> Result<CleanDocument> {
143        let locale_chain = [crate::LocaleTag::Global];
144        self.redact_with_context(session, raw, &locale_chain)
145    }
146
147    pub fn redact_with_context(
148        &self,
149        session: &Session,
150        raw: RawDocument,
151        locale_chain: &[crate::LocaleTag],
152    ) -> Result<CleanDocument> {
153        let dictionaries = DictionaryBundle::default();
154        self.redact_with_detect_context(session, raw, locale_chain, &dictionaries)
155    }
156
157    pub fn redact_with_detect_context(
158        &self,
159        session: &Session,
160        raw: RawDocument,
161        locale_chain: &[crate::LocaleTag],
162        dictionaries: &DictionaryBundle,
163    ) -> Result<CleanDocument> {
164        match raw {
165            RawDocument::Structured(structured_fields) => redact_structured(
166                self,
167                session,
168                structured_fields,
169                DocumentKind::Structured,
170                locale_chain,
171                dictionaries,
172            ),
173            RawDocument::Text(text) => Ok(CleanDocument::Text(self.redact_text(
174                session,
175                &text,
176                None,
177                DocumentKind::Text,
178                locale_chain,
179                dictionaries,
180            )?)),
181            _ => Err(Error::UnsupportedRawDocumentVariant),
182        }
183    }
184
185    pub fn clean_with_safety_net(
186        &self,
187        session: &Session,
188        raw: RawDocument,
189        locale_chain: &[crate::LocaleTag],
190    ) -> Result<(CleanDocument, Vec<EmittedTokenSpan>, LeakReport)> {
191        let dictionaries = DictionaryBundle::default();
192        self.clean_with_safety_net_detect_context(session, raw, locale_chain, &dictionaries)
193    }
194
195    pub fn clean_with_safety_net_detect_context(
196        &self,
197        session: &Session,
198        raw: RawDocument,
199        locale_chain: &[crate::LocaleTag],
200        dictionaries: &DictionaryBundle,
201    ) -> Result<(CleanDocument, Vec<EmittedTokenSpan>, LeakReport)> {
202        match raw {
203            RawDocument::Structured(structured_fields) => {
204                let mut report = LeakReport::default();
205                let clean = redact_structured_with_safety_net(
206                    self,
207                    session,
208                    structured_fields,
209                    locale_chain,
210                    dictionaries,
211                    &mut report,
212                )?;
213                Ok((CleanDocument::Structured(clean), Vec::new(), report))
214            }
215            RawDocument::Text(text) => {
216                let clean = self.redact_text_with_manifest(
217                    session,
218                    &text,
219                    None,
220                    DocumentKind::Text,
221                    locale_chain,
222                    dictionaries,
223                )?;
224                let report = self.run_safety_nets(
225                    session,
226                    &clean.text,
227                    &Manifest::from_spans(clean.manifest.clone()),
228                    DocumentKind::Text,
229                    locale_chain,
230                    None,
231                )?;
232                Ok((CleanDocument::Text(clean.text), clean.manifest, report))
233            }
234            _ => Err(Error::UnsupportedRawDocumentVariant),
235        }
236    }
237
238    pub fn scan_safety_nets(
239        &self,
240        session: &Session,
241        clean_text: &str,
242        locale_chain: &[crate::LocaleTag],
243    ) -> Result<SafetyNetResult> {
244        let nets_run = self.safety_nets.len();
245        if nets_run == 0 {
246            return Ok(SafetyNetResult {
247                nets_run,
248                report: LeakReport::default(),
249            });
250        }
251
252        let report = self.run_safety_nets(
253            session,
254            clean_text,
255            &Manifest::default(),
256            DocumentKind::Text,
257            locale_chain,
258            None,
259        )?;
260        Ok(SafetyNetResult { nets_run, report })
261    }
262
263    pub fn scan_safety_nets_structured(
264        &self,
265        session: &Session,
266        document: &BTreeMap<String, Value>,
267        locale_chain: &[crate::LocaleTag],
268    ) -> Result<SafetyNetResult> {
269        let nets_run = self.safety_nets.len();
270        if nets_run == 0 {
271            return Ok(SafetyNetResult {
272                nets_run,
273                report: LeakReport::default(),
274            });
275        }
276
277        let mut report = LeakReport::default();
278        for (key, value) in document {
279            walk_value_for_safety_net_scan(self, session, value, key, locale_chain, &mut report)?;
280        }
281        Ok(SafetyNetResult { nets_run, report })
282    }
283
284    #[allow(clippy::too_many_arguments)]
285    fn redact_text(
286        &self,
287        session: &Session,
288        text: &str,
289        field_name: Option<&str>,
290        document_kind: DocumentKind,
291        locale_chain: &[crate::LocaleTag],
292        dictionaries: &DictionaryBundle,
293    ) -> Result<String> {
294        Ok(self
295            .redact_text_with_manifest(
296                session,
297                text,
298                field_name,
299                document_kind,
300                locale_chain,
301                dictionaries,
302            )?
303            .text)
304    }
305
306    #[allow(clippy::too_many_arguments)]
307    fn redact_text_with_manifest(
308        &self,
309        session: &Session,
310        text: &str,
311        field_name: Option<&str>,
312        document_kind: DocumentKind,
313        locale_chain: &[crate::LocaleTag],
314        dictionaries: &DictionaryBundle,
315    ) -> Result<CleanText> {
316        let normalized = normalize(text);
317        let spans = &normalized.spans;
318        let ctx = DetectContext::new(locale_chain, dictionaries);
319        let resolved = self
320            .registry
321            .detect_all_resolved(&normalized.text, &ctx)
322            .into_iter()
323            .filter_map(|candidate| translate_candidate(candidate, spans))
324            .collect::<Vec<_>>();
325        let losers = merged_losers(&resolved);
326        let mut detections = resolved
327            .into_iter()
328            .map(IndexedDetection::from)
329            .collect::<Vec<_>>();
330        for loser in &losers {
331            self.log_entry(
332                session,
333                loser,
334                field_name,
335                document_kind,
336                self.action_for(&loser.detection, &build_context(field_name)),
337                true,
338            )?;
339        }
340
341        detections.sort_by_key(|d| d.detection.span.start);
342        let mut out = String::with_capacity(text.len());
343        let mut emitted = Vec::with_capacity(detections.len());
344        let mut cursor = 0usize;
345
346        for detection in detections {
347            let raw = text[detection.detection.span.clone()].to_string();
348            let context = build_context(field_name);
349            let action = self.action_for(&detection.detection, &context);
350            self.log_entry(
351                session,
352                &detection,
353                field_name,
354                document_kind,
355                action,
356                false,
357            )?;
358
359            let replacement = match action {
360                Action::Tokenize => Some(session.tokenize_with_family(
361                    &detection.family,
362                    &detection.detection.class,
363                    &raw,
364                )?),
365                Action::Redact => Some("[REDACTED]".to_string()),
366                Action::FormatPreserve => {
367                    Some(session.format_preserving_fake(&detection.detection.class, &raw)?)
368                }
369                Action::Generalize => Some(generalize_token(&detection.detection.class)),
370                Action::Preserve => None,
371                _ => return Err(Error::UnsupportedActionVariant),
372            };
373
374            let span = detection.detection.span;
375            if span.start > cursor {
376                out.push_str(&text[cursor..span.start]);
377            }
378            match replacement {
379                Some(replacement) => {
380                    let clean_start = out.len();
381                    out.push_str(&replacement);
382                    emitted.push(EmittedTokenSpan::new(
383                        clean_start..out.len(),
384                        span.clone(),
385                        detection.detection.class,
386                    ));
387                }
388                None => out.push_str(&text[span.clone()]),
389            }
390            cursor = span.end;
391        }
392
393        if cursor < text.len() {
394            out.push_str(&text[cursor..]);
395        }
396
397        Ok(CleanText {
398            text: out,
399            manifest: emitted,
400        })
401    }
402
403    fn run_safety_nets(
404        &self,
405        session: &Session,
406        clean_text: &str,
407        manifest: &Manifest,
408        document_kind: DocumentKind,
409        locale_chain: &[crate::LocaleTag],
410        field_path: Option<&str>,
411    ) -> Result<LeakReport> {
412        if self.safety_nets.is_empty() {
413            return Ok(LeakReport::default());
414        }
415
416        let mut suspects = Vec::<LeakSuspect>::new();
417        let mut telemetry = Vec::new();
418        let active = gaze_types::LocaleChain::from(locale_chain);
419        for net in &self.safety_nets {
420            if !active.intersects(net.supported_locales()) {
421                telemetry.push(LeakReportTelemetry::LocaleSkipped {
422                    safety_net_id: net.id().to_string(),
423                    document_kind,
424                    field_path: field_path.map(str::to_string),
425                });
426                continue;
427            }
428
429            let context = SafetyNetContext::new(
430                manifest,
431                locale_chain,
432                document_kind,
433                Some(session.audit_session_id()),
434                field_path,
435            );
436            let mut reported = net.check(clean_text, context)?;
437            if let Some(path) = field_path {
438                for suspect in &mut reported {
439                    if suspect.field_path.is_none() {
440                        suspect.field_path = Some(path.to_string());
441                    }
442                }
443            }
444            suspects.extend(reported);
445        }
446
447        Ok(LeakReport::from_parts(suspects, telemetry))
448    }
449
450    fn action_for(&self, detection: &Detection, context: &RuleContext) -> Action {
451        self.rules
452            .iter()
453            .find_map(|rule| rule.action(&detection.class, context))
454            .unwrap_or(Action::Preserve)
455    }
456
457    fn log_entry(
458        &self,
459        session: &Session,
460        detection: &IndexedDetection,
461        field_name: Option<&str>,
462        document_kind: DocumentKind,
463        action: Action,
464        conflict_loser: bool,
465    ) -> Result<()> {
466        let entry = RedactionEntry::new(
467            detection.detection.source.clone(),
468            detection.detection.class.clone(),
469            action,
470            field_name.map(str::to_string),
471            document_kind,
472            conflict_loser,
473            detection.decided_by,
474            crate::redaction_log::current_epoch_ms(),
475            Some(session.audit_session_id().to_string()),
476        );
477
478        for logger in &self.redaction_loggers {
479            logger.log(&entry)?;
480        }
481
482        Ok(())
483    }
484}
485
486#[derive(Clone)]
487struct IndexedDetection {
488    detection: Detection,
489    decided_by: ConflictTier,
490    family: String,
491}
492
493struct CleanText {
494    text: String,
495    manifest: Vec<EmittedTokenSpan>,
496}
497
498/// Builder for [`Pipeline`].
499///
500/// Obtain via [`Pipeline::builder()`]. Chain `.recognizer()`, `.rule()`, optionally
501/// `.redaction_logger()` / `.register_safety_net()`, then call `.build()`.
502///
503/// For bundled defaults (core rulepack + locale-aware recognizers without manual wiring), use
504/// `gaze_assembly::CorePipelineConfig` instead.
505#[derive(Default)]
506pub struct PipelineBuilder {
507    recognizers: Vec<Arc<dyn Recognizer>>,
508    redaction_loggers: Vec<Arc<dyn RedactionLogger>>,
509    safety_nets: Vec<Arc<dyn SafetyNet>>,
510    rules: Vec<Arc<dyn Rule>>,
511}
512
513impl PipelineBuilder {
514    pub fn detector<D>(mut self, detector: D) -> Self
515    where
516        D: Detector + 'static,
517    {
518        self.recognizers
519            .push(Arc::new(DetectorRecognizer::new(detector)));
520        self
521    }
522
523    pub fn recognizer<R>(mut self, recognizer: R) -> Self
524    where
525        R: Recognizer + 'static,
526    {
527        self.recognizers.push(Arc::new(recognizer));
528        self
529    }
530
531    pub fn rule<R>(mut self, rule: R) -> Self
532    where
533        R: Rule + 'static,
534    {
535        self.rules.push(Arc::new(rule));
536        self
537    }
538
539    pub fn redaction_logger<L>(mut self, logger: L) -> Self
540    where
541        L: RedactionLogger + 'static,
542    {
543        self.redaction_loggers.push(Arc::new(logger));
544        self
545    }
546
547    pub fn register_safety_net<N>(mut self, safety_net: N) -> Self
548    where
549        N: SafetyNet + 'static,
550    {
551        self.safety_nets.push(Arc::new(safety_net));
552        self
553    }
554
555    pub fn build(self) -> Result<Pipeline> {
556        let mut registry = RecognizerRegistry::builder();
557        for recognizer in self.recognizers {
558            registry = registry.register_arc(recognizer);
559        }
560        Ok(Pipeline {
561            registry: Arc::new(registry.build()),
562            redaction_loggers: self.redaction_loggers,
563            safety_nets: self.safety_nets,
564            rules: self.rules,
565        })
566    }
567}
568
569fn redact_structured(
570    pipeline: &Pipeline,
571    session: &Session,
572    fields: BTreeMap<String, Value>,
573    document_kind: DocumentKind,
574    locale_chain: &[crate::LocaleTag],
575    dictionaries: &DictionaryBundle,
576) -> Result<CleanDocument> {
577    let mut clean = BTreeMap::new();
578    for (key, value) in fields {
579        let path = format!("$.{key}");
580        clean.insert(
581            key.clone(),
582            redact_structured_value(
583                pipeline,
584                session,
585                value,
586                &key,
587                &path,
588                document_kind,
589                locale_chain,
590                dictionaries,
591            )?,
592        );
593    }
594    Ok(CleanDocument::Structured(clean))
595}
596
597#[allow(clippy::too_many_arguments)]
598fn redact_structured_value(
599    pipeline: &Pipeline,
600    session: &Session,
601    value: Value,
602    field_name: &str,
603    field_path: &str,
604    document_kind: DocumentKind,
605    locale_chain: &[crate::LocaleTag],
606    dictionaries: &DictionaryBundle,
607) -> Result<Value> {
608    match value {
609        Value::String(text) => Ok(Value::String(pipeline.redact_text(
610            session,
611            &text,
612            Some(field_name),
613            document_kind,
614            locale_chain,
615            dictionaries,
616        )?)),
617        Value::Array(values) => values
618            .into_iter()
619            .enumerate()
620            .map(|(idx, value)| {
621                redact_structured_value(
622                    pipeline,
623                    session,
624                    value,
625                    field_name,
626                    &format!("{field_path}[{idx}]"),
627                    document_kind,
628                    locale_chain,
629                    dictionaries,
630                )
631            })
632            .collect::<Result<Vec<_>>>()
633            .map(Value::Array),
634        Value::Object(fields) => {
635            let mut clean = BTreeMap::new();
636            for (key, value) in fields {
637                let child_path = format!("{field_path}.{key}");
638                clean.insert(
639                    key.clone(),
640                    redact_structured_value(
641                        pipeline,
642                        session,
643                        value,
644                        &key,
645                        &child_path,
646                        document_kind,
647                        locale_chain,
648                        dictionaries,
649                    )?,
650                );
651            }
652            Ok(Value::Object(clean))
653        }
654        Value::Null | Value::Bool(_) | Value::I64(_) => Ok(value),
655        _ => Err(Error::UnsupportedValueVariant),
656    }
657}
658
659#[allow(clippy::too_many_arguments)]
660fn redact_structured_with_safety_net(
661    pipeline: &Pipeline,
662    session: &Session,
663    fields: BTreeMap<String, Value>,
664    locale_chain: &[crate::LocaleTag],
665    dictionaries: &DictionaryBundle,
666    report: &mut LeakReport,
667) -> Result<BTreeMap<String, Value>> {
668    let mut clean = BTreeMap::new();
669    for (key, value) in fields {
670        let path = format!("$.{key}");
671        clean.insert(
672            key.clone(),
673            redact_structured_value_with_safety_net(
674                pipeline,
675                session,
676                value,
677                &key,
678                &path,
679                locale_chain,
680                dictionaries,
681                report,
682            )?,
683        );
684    }
685    Ok(clean)
686}
687
688#[allow(clippy::too_many_arguments)]
689fn redact_structured_value_with_safety_net(
690    pipeline: &Pipeline,
691    session: &Session,
692    value: Value,
693    field_name: &str,
694    field_path: &str,
695    locale_chain: &[crate::LocaleTag],
696    dictionaries: &DictionaryBundle,
697    report: &mut LeakReport,
698) -> Result<Value> {
699    match value {
700        Value::String(text) => {
701            if text.is_empty() {
702                return Ok(Value::String(text));
703            }
704            let clean = pipeline.redact_text_with_manifest(
705                session,
706                &text,
707                Some(field_name),
708                DocumentKind::Structured,
709                locale_chain,
710                dictionaries,
711            )?;
712            // For RawDocument::Structured, locale gating uses the session-level
713            // locale chain across all fields; fields have no locale annotations.
714            let field_report = pipeline.run_safety_nets(
715                session,
716                &clean.text,
717                &Manifest::from_spans(clean.manifest),
718                DocumentKind::Structured,
719                locale_chain,
720                Some(field_path),
721            )?;
722            report.extend(field_report);
723            Ok(Value::String(clean.text))
724        }
725        Value::Array(values) => values
726            .into_iter()
727            .enumerate()
728            .map(|(idx, value)| {
729                redact_structured_value_with_safety_net(
730                    pipeline,
731                    session,
732                    value,
733                    field_name,
734                    &format!("{field_path}[{idx}]"),
735                    locale_chain,
736                    dictionaries,
737                    report,
738                )
739            })
740            .collect::<Result<Vec<_>>>()
741            .map(Value::Array),
742        Value::Object(fields) => {
743            let mut clean = BTreeMap::new();
744            for (key, value) in fields {
745                let child_path = format!("{field_path}.{key}");
746                clean.insert(
747                    key.clone(),
748                    redact_structured_value_with_safety_net(
749                        pipeline,
750                        session,
751                        value,
752                        &key,
753                        &child_path,
754                        locale_chain,
755                        dictionaries,
756                        report,
757                    )?,
758                );
759            }
760            Ok(Value::Object(clean))
761        }
762        Value::Null | Value::Bool(_) | Value::I64(_) => {
763            if let Some(scalar) = value.scalar_to_safety_net_string() {
764                let field_report = pipeline.run_safety_nets(
765                    session,
766                    &scalar,
767                    &Manifest::default(),
768                    DocumentKind::Structured,
769                    locale_chain,
770                    Some(field_path),
771                )?;
772                report.extend(field_report);
773            }
774            Ok(value)
775        }
776        _ => Err(Error::UnsupportedValueVariant),
777    }
778}
779
780fn walk_value_for_safety_net_scan(
781    pipeline: &Pipeline,
782    session: &Session,
783    value: &Value,
784    field_path: &str,
785    locale_chain: &[crate::LocaleTag],
786    report: &mut LeakReport,
787) -> Result<()> {
788    match value {
789        Value::String(text) => {
790            if !text.is_empty() {
791                let field_report = pipeline.run_safety_nets(
792                    session,
793                    text,
794                    &Manifest::default(),
795                    DocumentKind::Structured,
796                    locale_chain,
797                    Some(field_path),
798                )?;
799                report.extend(field_report);
800            }
801        }
802        Value::Null => {}
803        Value::Bool(_) | Value::I64(_) => {
804            if let Some(scalar) = value.scalar_to_safety_net_string() {
805                let field_report = pipeline.run_safety_nets(
806                    session,
807                    &scalar,
808                    &Manifest::default(),
809                    DocumentKind::Structured,
810                    locale_chain,
811                    Some(field_path),
812                )?;
813                report.extend(field_report);
814            }
815        }
816        Value::Array(values) => {
817            for (idx, value) in values.iter().enumerate() {
818                walk_value_for_safety_net_scan(
819                    pipeline,
820                    session,
821                    value,
822                    &format!("{field_path}[{idx}]"),
823                    locale_chain,
824                    report,
825                )?;
826            }
827        }
828        Value::Object(fields) => {
829            for (key, value) in fields {
830                walk_value_for_safety_net_scan(
831                    pipeline,
832                    session,
833                    value,
834                    &format!("{field_path}.{key}"),
835                    locale_chain,
836                    report,
837                )?;
838            }
839        }
840        _ => return Err(Error::UnsupportedValueVariant),
841    }
842    Ok(())
843}
844
845fn translate_candidate(candidate: Candidate, spans: &[(usize, usize)]) -> Option<Candidate> {
846    translate_span(candidate.span.clone(), spans).map(|span| candidate.with_span(span))
847}
848
849fn translate_span(
850    span: std::ops::Range<usize>,
851    spans: &[(usize, usize)],
852) -> Option<std::ops::Range<usize>> {
853    if span.is_empty() || span.end > spans.len() {
854        return None;
855    }
856
857    let start = spans[span.start].0;
858    let end = spans[span.end - 1].1;
859    Some(start..end)
860}
861
862fn merged_losers(resolved: &[Candidate]) -> Vec<IndexedDetection> {
863    resolved
864        .iter()
865        .flat_map(|winner| {
866            winner.merged_sources.iter().map(|source| IndexedDetection {
867                detection: Detection::new(
868                    winner.span.clone(),
869                    winner.class.clone(),
870                    source.clone(),
871                ),
872                decided_by: if winner.decided_by == ConflictTier::Merged {
873                    ConflictTier::Merged
874                } else {
875                    winner.decided_by
876                },
877                family: winner.token_family.clone(),
878            })
879        })
880        .collect()
881}
882
883impl From<Candidate> for IndexedDetection {
884    fn from(candidate: Candidate) -> Self {
885        Self {
886            detection: Detection::new(candidate.span, candidate.class, candidate.source),
887            decided_by: candidate.decided_by,
888            family: candidate.token_family,
889        }
890    }
891}
892
893struct DetectorRecognizer<D> {
894    detector: D,
895    class: crate::PiiClass,
896}
897
898impl<D> DetectorRecognizer<D> {
899    fn new(detector: D) -> Self {
900        Self {
901            detector,
902            class: crate::PiiClass::Custom("__legacy_detector__".to_string()),
903        }
904    }
905}
906
907impl<D> Recognizer for DetectorRecognizer<D>
908where
909    D: Detector + Send + Sync + 'static,
910{
911    fn id(&self) -> &str {
912        "legacy-detector"
913    }
914
915    fn supported_class(&self) -> &crate::PiiClass {
916        &self.class
917    }
918
919    fn detect(&self, input: &str, _ctx: &DetectContext<'_>) -> Vec<Candidate> {
920        self.detector
921            .detect(input)
922            .into_iter()
923            .map(|detection| {
924                let source = detection.source;
925                Candidate::new(
926                    detection.span,
927                    detection.class,
928                    source.clone(),
929                    1.0,
930                    0,
931                    None,
932                    "counter",
933                    source,
934                    ConflictTier::None,
935                    Vec::new(),
936                )
937            })
938            .collect()
939    }
940
941    fn token_family(&self) -> &str {
942        "counter"
943    }
944}
945
946fn generalize_token(class: &PiiClass) -> String {
947    match class {
948        PiiClass::Email => "[EMAIL]".to_string(),
949        PiiClass::Name => "[NAME]".to_string(),
950        PiiClass::Location => "[LOCATION]".to_string(),
951        PiiClass::Organization => "[ORGANIZATION]".to_string(),
952        PiiClass::Custom(name) => format!("[{}]", name.to_ascii_uppercase()),
953    }
954}
955
956fn build_context(field_name: Option<&str>) -> RuleContext {
957    RuleContext {
958        field_name: field_name.map(str::to_string),
959    }
960}
961
962#[cfg(test)]
963mod tests {
964    use super::*;
965    use crate::detector::{Detection, PiiClass};
966    use crate::rule::{ClassRule, DefaultRule};
967    use crate::session::{Scope, Session};
968    use std::sync::Mutex;
969
970    /// Shared-handle test double: callers keep an `Arc<Mutex<Vec<_>>>` and
971    /// clone it into the logger, letting the builder take ownership while
972    /// the test retains read access.
973    struct CapturingLogger {
974        entries: Arc<Mutex<Vec<RedactionEntry>>>,
975    }
976
977    struct FixedDetector {
978        detections: Vec<Detection>,
979    }
980
981    impl Detector for FixedDetector {
982        fn detect(&self, _input: &str) -> Vec<Detection> {
983            self.detections.clone()
984        }
985    }
986
987    fn detector_with_detections(source: &str, detections: Vec<Detection>) -> FixedDetector {
988        FixedDetector {
989            detections: detections
990                .into_iter()
991                .map(|mut detection| {
992                    detection.source = source.to_string();
993                    detection
994                })
995                .collect(),
996        }
997    }
998
999    impl RedactionLogger for CapturingLogger {
1000        fn log(&self, entry: &RedactionEntry) -> std::result::Result<(), RedactionLogError> {
1001            self.entries.lock().unwrap().push(entry.clone());
1002            Ok(())
1003        }
1004    }
1005
1006    #[test]
1007    fn generalize_token_custom_class_preserves_identity() {
1008        // Regression guard: custom classes must not collapse to an indistinct [PII].
1009        assert_eq!(generalize_token(&PiiClass::Custom("foo".into())), "[FOO]");
1010    }
1011
1012    #[test]
1013    fn stacked_ner_detectors_resolve_via_span_conflict() {
1014        // Input: "Alice Smith works here" — byte spans: Alice=0..5, full name=0..11.
1015        let text = "Alice Smith works here";
1016        let short_detection = Detection::new(0..5, PiiClass::Name, "ner/bert");
1017        let long_detection = Detection::new(0..11, PiiClass::Name, "ner/gliner");
1018
1019        let bert = detector_with_detections("ner/bert", vec![short_detection]);
1020        let gliner = detector_with_detections("ner/gliner", vec![long_detection]);
1021
1022        let entries = Arc::new(Mutex::new(Vec::<RedactionEntry>::new()));
1023
1024        let pipeline = Pipeline::builder()
1025            .detector(bert)
1026            .detector(gliner)
1027            .rule(ClassRule::new(PiiClass::Name, Action::Redact))
1028            .rule(DefaultRule::new(Action::Preserve))
1029            .redaction_logger(CapturingLogger {
1030                entries: Arc::clone(&entries),
1031            })
1032            .build()
1033            .expect("pipeline");
1034
1035        let session = Session::new(Scope::Ephemeral).expect("session");
1036        let clean = pipeline
1037            .redact(&session, RawDocument::Text(text.to_string()))
1038            .expect("redact");
1039
1040        let out = match clean {
1041            CleanDocument::Text(t) => t,
1042            _ => panic!("expected text"),
1043        };
1044
1045        // Longer span wins: full name replaced, trailing " works here" preserved.
1046        assert_eq!(out, "[REDACTED] works here");
1047
1048        let entries = entries.lock().unwrap();
1049        assert_eq!(
1050            entries.len(),
1051            2,
1052            "expected one winner + one loser: {entries:?}"
1053        );
1054        let winner = entries.iter().find(|e| !e.conflict_loser).expect("winner");
1055        let loser = entries.iter().find(|e| e.conflict_loser).expect("loser");
1056        assert_eq!(winner.source, "ner/gliner", "longer span should win");
1057        assert_eq!(loser.source, "ner/bert", "shorter span should lose");
1058        assert_eq!(loser.decided_by, ConflictTier::SpanLength);
1059    }
1060
1061    #[test]
1062    fn stacked_detectors_both_win_when_spans_disjoint() {
1063        let text = "Alice visited Berlin";
1064        let alice = Detection::new(0..5, PiiClass::Name, "ner/bert");
1065        let berlin = Detection::new(14..20, PiiClass::Location, "ner/gliner");
1066
1067        let bert = detector_with_detections("ner/bert", vec![alice]);
1068        let gliner = detector_with_detections("ner/gliner", vec![berlin]);
1069
1070        let entries = Arc::new(Mutex::new(Vec::<RedactionEntry>::new()));
1071
1072        let pipeline = Pipeline::builder()
1073            .detector(bert)
1074            .detector(gliner)
1075            .rule(ClassRule::new(PiiClass::Name, Action::Redact))
1076            .rule(ClassRule::new(PiiClass::Location, Action::Redact))
1077            .rule(DefaultRule::new(Action::Preserve))
1078            .redaction_logger(CapturingLogger {
1079                entries: Arc::clone(&entries),
1080            })
1081            .build()
1082            .expect("pipeline");
1083
1084        let session = Session::new(Scope::Ephemeral).expect("session");
1085        let clean = pipeline
1086            .redact(&session, RawDocument::Text(text.to_string()))
1087            .expect("redact");
1088
1089        let out = match clean {
1090            CleanDocument::Text(t) => t,
1091            _ => panic!("expected text"),
1092        };
1093
1094        assert_eq!(out, "[REDACTED] visited [REDACTED]");
1095        let entries = entries.lock().unwrap();
1096        assert_eq!(entries.len(), 2);
1097        assert!(entries.iter().all(|e| !e.conflict_loser));
1098    }
1099
1100    #[test]
1101    fn pipeline_builder_detects_email() {
1102        struct EmailDetector(regex::Regex);
1103
1104        impl Detector for EmailDetector {
1105            fn detect(&self, input: &str) -> Vec<Detection> {
1106                self.0
1107                    .find_iter(input)
1108                    .map(|m| Detection::new(m.range(), PiiClass::Email, "regex"))
1109                    .collect()
1110            }
1111        }
1112
1113        let pipeline = Pipeline::builder()
1114            .detector(EmailDetector(
1115                regex::Regex::new(r"(?i)\b[a-z0-9._%+\-]+@[a-z0-9.\-]+\.[a-z]{2,}\b").unwrap(),
1116            ))
1117            .rule(ClassRule::new(PiiClass::Email, Action::Tokenize))
1118            .rule(DefaultRule::new(Action::Preserve))
1119            .build()
1120            .unwrap();
1121        let session = Session::new(Scope::Ephemeral).unwrap();
1122
1123        let clean = pipeline
1124            .redact(
1125                &session,
1126                RawDocument::Text("Reach alice@example.com today".to_string()),
1127            )
1128            .unwrap();
1129
1130        match clean {
1131            CleanDocument::Text(text) => {
1132                assert!(text.starts_with("Reach <"));
1133                assert!(text.ends_with(":Email_1> today"));
1134            }
1135            other => panic!("expected text output, got {other:?}"),
1136        }
1137    }
1138
1139    #[test]
1140    fn t21d_token_family_threads_from_recognizer_to_session() {
1141        struct FamilyRecognizer;
1142
1143        impl Recognizer for FamilyRecognizer {
1144            fn id(&self) -> &str {
1145                "name.alpha"
1146            }
1147
1148            fn supported_class(&self) -> &PiiClass {
1149                &PiiClass::Name
1150            }
1151
1152            fn detect(&self, input: &str, _ctx: &DetectContext<'_>) -> Vec<Candidate> {
1153                let Some(start) = input.find("Dr. Schmidt") else {
1154                    return Vec::new();
1155                };
1156                let end = start + "Dr. Schmidt".len();
1157                vec![Candidate::new(
1158                    start..end,
1159                    PiiClass::Name,
1160                    self.id(),
1161                    1.0,
1162                    0,
1163                    None,
1164                    self.token_family(),
1165                    self.id(),
1166                    ConflictTier::None,
1167                    Vec::new(),
1168                )]
1169            }
1170
1171            fn token_family(&self) -> &str {
1172                "alpha"
1173            }
1174        }
1175
1176        let pipeline = Pipeline::builder()
1177            .recognizer(FamilyRecognizer)
1178            .rule(ClassRule::new(PiiClass::Name, Action::Tokenize))
1179            .rule(DefaultRule::new(Action::Preserve))
1180            .build()
1181            .expect("pipeline");
1182        let session = Session::new(Scope::Ephemeral).expect("session");
1183
1184        let clean = pipeline
1185            .redact(
1186                &session,
1187                RawDocument::Text("Assigned to Dr. Schmidt".to_string()),
1188            )
1189            .expect("redact");
1190        let CleanDocument::Text(text) = clean else {
1191            panic!("expected text");
1192        };
1193        let token = text
1194            .strip_prefix("Assigned to ")
1195            .expect("token prefix")
1196            .to_string();
1197        assert!(regex::Regex::new(r"^<[0-9a-f]{8}:Name_\d+>$")
1198            .unwrap()
1199            .is_match(&token));
1200
1201        let beta = session
1202            .tokenize_with_family("beta", &PiiClass::Name, "Dr. Schmidt")
1203            .expect("beta token");
1204        assert_ne!(token, beta);
1205        assert_eq!(
1206            session
1207                .tokenize_with_family("alpha", &PiiClass::Name, "Dr. Schmidt")
1208                .expect("alpha token"),
1209            token
1210        );
1211        assert_eq!(session.restore(&token).as_deref(), Some("Dr. Schmidt"));
1212        assert_eq!(session.restore(&beta).as_deref(), Some("Dr. Schmidt"));
1213    }
1214}