adaptive_pipeline/infrastructure/services/
tee.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8// Infrastructure module - contains future features not yet fully utilized
9#![allow(dead_code, unused_imports, unused_variables)]
10//! # Tee Service
11//!
12//! Production-ready tee (data splitting) stage for the adaptive pipeline.
13//! This service provides data inspection and debugging capabilities, useful
14//! for:
15//!
16//! - **Debugging**: Inspecting data at specific pipeline stages
17//! - **Monitoring**: Capturing samples for analysis
18//! - **Validation**: Verifying transformations at intermediate stages
19//! - **Audit Trails**: Recording data flow for compliance
20//!
21//! ## Architecture
22//!
23//! This implementation demonstrates the complete pattern for creating pipeline
24//! stages:
25//!
26//! - **Config Struct**: `TeeConfig` with typed parameters
27//! - **FromParameters**: Type-safe extraction from HashMap
28//! - **Service Struct**: `TeeService` implements `StageService`
29//! - **Position**: `Any` (can be placed anywhere in the pipeline)
30//! - **Reversibility**: Pass-through (Forward and Reverse both write + pass
31//!   data)
32//!
33//! ## Usage
34//!
35//! ```rust
36//! use adaptive_pipeline::infrastructure::services::TeeService;
37//! use adaptive_pipeline_domain::services::StageService;
38//!
39//! let service = TeeService::new();
40//! // Used automatically by pipeline when configured with StageType::Transform
41//! ```
42//!
43//! ## Configuration Parameters
44//!
45//! - **output_path** (required): Path to write teed data
46//!   - Example: `"/tmp/debug-output.bin"`
47//!   - Data will be written to this file (appended if exists)
48//!
49//! - **format** (optional): Output format for teed data
50//!   - `"binary"` - Raw binary output (default)
51//!   - `"hex"` - Hexadecimal dump format
52//!   - `"text"` - UTF-8 text (lossy conversion for non-UTF8)
53//!   - Default: "binary"
54//!
55//! - **enabled** (optional): Whether tee is active
56//!   - `"true"` - Tee is active (default)
57//!   - `"false"` - Tee is disabled (pass-through only)
58//!   - Allows conditional enable/disable without removing stage
59//!
60//! ## Performance Characteristics
61//!
62//! - **Throughput**: Limited by I/O speed to tee output
63//! - **Overhead**: File write operation + optional format conversion
64//! - **Memory**: Minimal, no buffering
65//! - **Latency**: Synchronous I/O (blocking writes)
66
67use adaptive_pipeline_domain::entities::{Operation, ProcessingContext, StageConfiguration, StagePosition, StageType};
68use adaptive_pipeline_domain::services::{FromParameters, StageService};
69use adaptive_pipeline_domain::value_objects::file_chunk::FileChunk;
70use adaptive_pipeline_domain::PipelineError;
71use std::collections::HashMap;
72use std::fs::OpenOptions;
73use std::io::Write;
74use std::path::PathBuf;
75
76/// Output format for teed data.
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum TeeFormat {
79    /// Raw binary output
80    Binary,
81    /// Hexadecimal dump format
82    Hex,
83    /// UTF-8 text (lossy conversion)
84    Text,
85}
86
87/// Configuration for Tee operations.
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub struct TeeConfig {
90    /// Path to write teed data
91    pub output_path: PathBuf,
92    /// Output format
93    pub format: TeeFormat,
94    /// Whether tee is enabled
95    pub enabled: bool,
96}
97
98impl Default for TeeConfig {
99    fn default() -> Self {
100        Self {
101            output_path: PathBuf::from("/tmp/tee-output.bin"),
102            format: TeeFormat::Binary,
103            enabled: true,
104        }
105    }
106}
107
108/// Implementation of `FromParameters` for TeeConfig.
109///
110/// Extracts typed configuration from HashMap parameters following
111/// the pattern established by other service configs.
112impl FromParameters for TeeConfig {
113    fn from_parameters(params: &HashMap<String, String>) -> Result<Self, PipelineError> {
114        // Required: output_path
115        let output_path = params
116            .get("output_path")
117            .ok_or_else(|| PipelineError::MissingParameter("output_path is required for tee stage".into()))?
118            .into();
119
120        // Optional: format (defaults to binary)
121        let format = params
122            .get("format")
123            .map(|s| match s.to_lowercase().as_str() {
124                "binary" => Ok(TeeFormat::Binary),
125                "hex" => Ok(TeeFormat::Hex),
126                "text" => Ok(TeeFormat::Text),
127                other => Err(PipelineError::InvalidParameter(format!(
128                    "Unknown tee format: {}. Valid: binary, hex, text",
129                    other
130                ))),
131            })
132            .transpose()?
133            .unwrap_or(TeeFormat::Binary);
134
135        // Optional: enabled (defaults to true)
136        let enabled = params
137            .get("enabled")
138            .map(|s| s.to_lowercase() == "true")
139            .unwrap_or(true);
140
141        Ok(Self {
142            output_path,
143            format,
144            enabled,
145        })
146    }
147}
148
149/// Production Tee service.
150///
151/// This service demonstrates the complete pattern for implementing pipeline
152/// stages:
153/// - Stateless processing (no internal state)
154/// - Thread-safe (`Send + Sync`)
155/// - Pass-through operation (data flows unchanged)
156/// - Type-safe configuration via `FromParameters`
157/// - Proper error handling with `PipelineError`
158///
159/// ## Implementation Notes
160///
161/// - **Position**: `Any` - Can be placed anywhere in the pipeline
162/// - **Reversibility**: `true` - Pass-through in both directions
163/// - **Stage Type**: `Transform` - Data inspection operation
164/// - **Performance**: Synchronous I/O (blocking writes)
165pub struct TeeService;
166
167impl TeeService {
168    /// Creates a new Tee service.
169    pub fn new() -> Self {
170        Self
171    }
172
173    /// Writes data to the tee output in the specified format.
174    fn write_tee(&self, data: &[u8], config: &TeeConfig, chunk_seq: u64) -> Result<(), PipelineError> {
175        if !config.enabled {
176            return Ok(());
177        }
178
179        // Format the data according to config
180        let formatted = match config.format {
181            TeeFormat::Binary => data.to_vec(),
182            TeeFormat::Hex => {
183                // Hex dump with ASCII sidebar (like hexdump -C)
184                let mut output = Vec::new();
185                for (i, chunk) in data.chunks(16).enumerate() {
186                    let offset = i * 16;
187                    let hex = chunk.iter().map(|b| format!("{:02x}", b)).collect::<Vec<_>>().join(" ");
188                    let ascii = chunk
189                        .iter()
190                        .map(|&b| {
191                            if b.is_ascii_graphic() || b == b' ' {
192                                b as char
193                            } else {
194                                '.'
195                            }
196                        })
197                        .collect::<String>();
198                    let line = format!("{:08x}  {:<48}  |{}|\n", offset, hex, ascii);
199                    output.extend_from_slice(line.as_bytes());
200                }
201                output
202            }
203            TeeFormat::Text => String::from_utf8_lossy(data).into_owned().into_bytes(),
204        };
205
206        // Write to file (append mode)
207        let mut file = OpenOptions::new()
208            .create(true)
209            .append(true)
210            .open(&config.output_path)
211            .map_err(|e| {
212                PipelineError::ProcessingFailed(format!(
213                    "Failed to open tee output file '{}': {}",
214                    config.output_path.display(),
215                    e
216                ))
217            })?;
218
219        // Write chunk separator for clarity
220        writeln!(file, "--- Chunk {} ({} bytes) ---", chunk_seq, data.len())
221            .map_err(|e| PipelineError::ProcessingFailed(format!("Failed to write tee separator: {}", e)))?;
222
223        file.write_all(&formatted)
224            .map_err(|e| PipelineError::ProcessingFailed(format!("Failed to write tee data: {}", e)))?;
225
226        writeln!(file).map_err(|e| PipelineError::ProcessingFailed(format!("Failed to write tee newline: {}", e)))?;
227
228        Ok(())
229    }
230}
231
232impl Default for TeeService {
233    fn default() -> Self {
234        Self::new()
235    }
236}
237
238/// Implementation of `StageService` for Tee.
239///
240/// This demonstrates the complete pattern that all stages follow:
241/// 1. Extract typed config via `FromParameters`
242/// 2. Write data to tee output
243/// 3. Pass data through unchanged
244/// 4. Return original chunk
245impl StageService for TeeService {
246    fn process_chunk(
247        &self,
248        chunk: FileChunk,
249        config: &StageConfiguration,
250        _context: &mut ProcessingContext,
251    ) -> Result<FileChunk, PipelineError> {
252        // Type-safe config extraction using FromParameters trait
253        let tee_config = TeeConfig::from_parameters(&config.parameters)?;
254
255        // Write data to tee output (both Forward and Reverse operations)
256        tracing::debug!(
257            chunk_seq = chunk.sequence_number(),
258            output_path = %tee_config.output_path.display(),
259            format = ?tee_config.format,
260            enabled = tee_config.enabled,
261            operation = %config.operation,
262            "Tee operation"
263        );
264
265        self.write_tee(chunk.data(), &tee_config, chunk.sequence_number())?;
266
267        // Pass data through unchanged
268        Ok(chunk)
269    }
270
271    fn position(&self) -> StagePosition {
272        // Any: Can be placed anywhere in the pipeline
273        // Reason: Diagnostic stage that doesn't affect data flow
274        StagePosition::Any
275    }
276
277    fn is_reversible(&self) -> bool {
278        // Pass-through: Works in both directions
279        true
280    }
281
282    fn stage_type(&self) -> StageType {
283        // Transform: Data inspection operation
284        StageType::Transform
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291    use adaptive_pipeline_domain::entities::{SecurityContext, SecurityLevel};
292    use std::fs;
293    use std::path::PathBuf;
294    use tempfile::TempDir;
295
296    #[test]
297    fn test_from_parameters_minimal() {
298        let mut params = HashMap::new();
299        params.insert("output_path".to_string(), "/tmp/test.bin".to_string());
300        let config = TeeConfig::from_parameters(&params).unwrap();
301        assert_eq!(config.output_path, PathBuf::from("/tmp/test.bin"));
302        assert_eq!(config.format, TeeFormat::Binary);
303        assert!(config.enabled);
304    }
305
306    #[test]
307    fn test_from_parameters_full() {
308        let mut params = HashMap::new();
309        params.insert("output_path".to_string(), "/tmp/test.hex".to_string());
310        params.insert("format".to_string(), "hex".to_string());
311        params.insert("enabled".to_string(), "false".to_string());
312        let config = TeeConfig::from_parameters(&params).unwrap();
313        assert_eq!(config.output_path, PathBuf::from("/tmp/test.hex"));
314        assert_eq!(config.format, TeeFormat::Hex);
315        assert!(!config.enabled);
316    }
317
318    #[test]
319    fn test_from_parameters_missing_output_path() {
320        let params = HashMap::new();
321        let result = TeeConfig::from_parameters(&params);
322        assert!(result.is_err());
323        assert!(result.unwrap_err().to_string().contains("output_path"));
324    }
325
326    #[test]
327    fn test_from_parameters_invalid_format() {
328        let mut params = HashMap::new();
329        params.insert("output_path".to_string(), "/tmp/test.bin".to_string());
330        params.insert("format".to_string(), "invalid".to_string());
331        let result = TeeConfig::from_parameters(&params);
332        assert!(result.is_err());
333    }
334
335    #[test]
336    fn test_tee_binary_format() {
337        let temp_dir = TempDir::new().unwrap();
338        let output_path = temp_dir.path().join("tee-binary.bin");
339
340        let service = TeeService::new();
341        let config = TeeConfig {
342            output_path: output_path.clone(),
343            format: TeeFormat::Binary,
344            enabled: true,
345        };
346
347        let test_data = b"Hello, World!";
348        service.write_tee(test_data, &config, 0).unwrap();
349
350        let contents = fs::read_to_string(&output_path).unwrap();
351        assert!(contents.contains("Chunk 0"));
352        assert!(contents.contains("Hello, World!"));
353    }
354
355    #[test]
356    fn test_tee_hex_format() {
357        let temp_dir = TempDir::new().unwrap();
358        let output_path = temp_dir.path().join("tee-hex.txt");
359
360        let service = TeeService::new();
361        let config = TeeConfig {
362            output_path: output_path.clone(),
363            format: TeeFormat::Hex,
364            enabled: true,
365        };
366
367        let test_data = b"Hello, World!";
368        service.write_tee(test_data, &config, 0).unwrap();
369
370        let contents = fs::read_to_string(&output_path).unwrap();
371        assert!(contents.contains("Chunk 0"));
372        assert!(contents.contains("48 65 6c 6c 6f")); // "Hello" in hex
373    }
374
375    #[test]
376    fn test_tee_text_format() {
377        let temp_dir = TempDir::new().unwrap();
378        let output_path = temp_dir.path().join("tee-text.txt");
379
380        let service = TeeService::new();
381        let config = TeeConfig {
382            output_path: output_path.clone(),
383            format: TeeFormat::Text,
384            enabled: true,
385        };
386
387        let test_data = b"Hello, World!";
388        service.write_tee(test_data, &config, 0).unwrap();
389
390        let contents = fs::read_to_string(&output_path).unwrap();
391        assert!(contents.contains("Chunk 0"));
392        assert!(contents.contains("Hello, World!"));
393    }
394
395    #[test]
396    fn test_tee_disabled() {
397        let temp_dir = TempDir::new().unwrap();
398        let output_path = temp_dir.path().join("tee-disabled.bin");
399
400        let service = TeeService::new();
401        let config = TeeConfig {
402            output_path: output_path.clone(),
403            format: TeeFormat::Binary,
404            enabled: false,
405        };
406
407        service.write_tee(b"test", &config, 0).unwrap();
408
409        // File should not exist when disabled
410        assert!(!output_path.exists());
411    }
412
413    #[test]
414    fn test_process_chunk_pass_through() {
415        use adaptive_pipeline_domain::entities::pipeline_stage::StageConfiguration;
416
417        let temp_dir = TempDir::new().unwrap();
418        let output_path = temp_dir.path().join("tee-process.bin");
419
420        let service = TeeService::new();
421        let original_data = b"Test data for pass-through".to_vec();
422        let chunk = FileChunk::new(0, 0, original_data.clone(), false).unwrap();
423
424        let mut params = HashMap::new();
425        params.insert("output_path".to_string(), output_path.display().to_string());
426        params.insert("format".to_string(), "binary".to_string());
427
428        let config = StageConfiguration {
429            algorithm: "tee".to_string(),
430            operation: Operation::Forward,
431            parameters: params,
432            parallel_processing: false,
433            chunk_size: None,
434        };
435
436        let mut context = ProcessingContext::new(
437            100,
438            SecurityContext::new(None, SecurityLevel::Public),
439        );
440
441        let result = service.process_chunk(chunk, &config, &mut context).unwrap();
442
443        // Data should pass through unchanged
444        assert_eq!(result.data(), &original_data);
445
446        // Tee file should exist
447        assert!(output_path.exists());
448    }
449
450    #[test]
451    fn test_stage_service_properties() {
452        let service = TeeService::new();
453
454        assert_eq!(service.position(), StagePosition::Any);
455        assert!(service.is_reversible());
456        assert_eq!(service.stage_type(), StageType::Transform);
457    }
458}