adaptive_pipeline/infrastructure/services/
tee.rs1#![allow(dead_code, unused_imports, unused_variables)]
10use adaptive_pipeline_domain::entities::{Operation, ProcessingContext, StageConfiguration, StagePosition, StageType};
68use adaptive_pipeline_domain::services::{FromParameters, StageService};
69use adaptive_pipeline_domain::value_objects::file_chunk::FileChunk;
70use adaptive_pipeline_domain::PipelineError;
71use std::collections::HashMap;
72use std::fs::OpenOptions;
73use std::io::Write;
74use std::path::PathBuf;
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum TeeFormat {
79 Binary,
81 Hex,
83 Text,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq)]
89pub struct TeeConfig {
90 pub output_path: PathBuf,
92 pub format: TeeFormat,
94 pub enabled: bool,
96}
97
98impl Default for TeeConfig {
99 fn default() -> Self {
100 Self {
101 output_path: PathBuf::from("/tmp/tee-output.bin"),
102 format: TeeFormat::Binary,
103 enabled: true,
104 }
105 }
106}
107
108impl FromParameters for TeeConfig {
113 fn from_parameters(params: &HashMap<String, String>) -> Result<Self, PipelineError> {
114 let output_path = params
116 .get("output_path")
117 .ok_or_else(|| PipelineError::MissingParameter("output_path is required for tee stage".into()))?
118 .into();
119
120 let format = params
122 .get("format")
123 .map(|s| match s.to_lowercase().as_str() {
124 "binary" => Ok(TeeFormat::Binary),
125 "hex" => Ok(TeeFormat::Hex),
126 "text" => Ok(TeeFormat::Text),
127 other => Err(PipelineError::InvalidParameter(format!(
128 "Unknown tee format: {}. Valid: binary, hex, text",
129 other
130 ))),
131 })
132 .transpose()?
133 .unwrap_or(TeeFormat::Binary);
134
135 let enabled = params
137 .get("enabled")
138 .map(|s| s.to_lowercase() == "true")
139 .unwrap_or(true);
140
141 Ok(Self {
142 output_path,
143 format,
144 enabled,
145 })
146 }
147}
148
149pub struct TeeService;
166
167impl TeeService {
168 pub fn new() -> Self {
170 Self
171 }
172
173 fn write_tee(&self, data: &[u8], config: &TeeConfig, chunk_seq: u64) -> Result<(), PipelineError> {
175 if !config.enabled {
176 return Ok(());
177 }
178
179 let formatted = match config.format {
181 TeeFormat::Binary => data.to_vec(),
182 TeeFormat::Hex => {
183 let mut output = Vec::new();
185 for (i, chunk) in data.chunks(16).enumerate() {
186 let offset = i * 16;
187 let hex = chunk.iter().map(|b| format!("{:02x}", b)).collect::<Vec<_>>().join(" ");
188 let ascii = chunk
189 .iter()
190 .map(|&b| {
191 if b.is_ascii_graphic() || b == b' ' {
192 b as char
193 } else {
194 '.'
195 }
196 })
197 .collect::<String>();
198 let line = format!("{:08x} {:<48} |{}|\n", offset, hex, ascii);
199 output.extend_from_slice(line.as_bytes());
200 }
201 output
202 }
203 TeeFormat::Text => String::from_utf8_lossy(data).into_owned().into_bytes(),
204 };
205
206 let mut file = OpenOptions::new()
208 .create(true)
209 .append(true)
210 .open(&config.output_path)
211 .map_err(|e| {
212 PipelineError::ProcessingFailed(format!(
213 "Failed to open tee output file '{}': {}",
214 config.output_path.display(),
215 e
216 ))
217 })?;
218
219 writeln!(file, "--- Chunk {} ({} bytes) ---", chunk_seq, data.len())
221 .map_err(|e| PipelineError::ProcessingFailed(format!("Failed to write tee separator: {}", e)))?;
222
223 file.write_all(&formatted)
224 .map_err(|e| PipelineError::ProcessingFailed(format!("Failed to write tee data: {}", e)))?;
225
226 writeln!(file).map_err(|e| PipelineError::ProcessingFailed(format!("Failed to write tee newline: {}", e)))?;
227
228 Ok(())
229 }
230}
231
232impl Default for TeeService {
233 fn default() -> Self {
234 Self::new()
235 }
236}
237
238impl StageService for TeeService {
246 fn process_chunk(
247 &self,
248 chunk: FileChunk,
249 config: &StageConfiguration,
250 _context: &mut ProcessingContext,
251 ) -> Result<FileChunk, PipelineError> {
252 let tee_config = TeeConfig::from_parameters(&config.parameters)?;
254
255 tracing::debug!(
257 chunk_seq = chunk.sequence_number(),
258 output_path = %tee_config.output_path.display(),
259 format = ?tee_config.format,
260 enabled = tee_config.enabled,
261 operation = %config.operation,
262 "Tee operation"
263 );
264
265 self.write_tee(chunk.data(), &tee_config, chunk.sequence_number())?;
266
267 Ok(chunk)
269 }
270
271 fn position(&self) -> StagePosition {
272 StagePosition::Any
275 }
276
277 fn is_reversible(&self) -> bool {
278 true
280 }
281
282 fn stage_type(&self) -> StageType {
283 StageType::Transform
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291 use adaptive_pipeline_domain::entities::{SecurityContext, SecurityLevel};
292 use std::fs;
293 use std::path::PathBuf;
294 use tempfile::TempDir;
295
296 #[test]
297 fn test_from_parameters_minimal() {
298 let mut params = HashMap::new();
299 params.insert("output_path".to_string(), "/tmp/test.bin".to_string());
300 let config = TeeConfig::from_parameters(¶ms).unwrap();
301 assert_eq!(config.output_path, PathBuf::from("/tmp/test.bin"));
302 assert_eq!(config.format, TeeFormat::Binary);
303 assert!(config.enabled);
304 }
305
306 #[test]
307 fn test_from_parameters_full() {
308 let mut params = HashMap::new();
309 params.insert("output_path".to_string(), "/tmp/test.hex".to_string());
310 params.insert("format".to_string(), "hex".to_string());
311 params.insert("enabled".to_string(), "false".to_string());
312 let config = TeeConfig::from_parameters(¶ms).unwrap();
313 assert_eq!(config.output_path, PathBuf::from("/tmp/test.hex"));
314 assert_eq!(config.format, TeeFormat::Hex);
315 assert!(!config.enabled);
316 }
317
318 #[test]
319 fn test_from_parameters_missing_output_path() {
320 let params = HashMap::new();
321 let result = TeeConfig::from_parameters(¶ms);
322 assert!(result.is_err());
323 assert!(result.unwrap_err().to_string().contains("output_path"));
324 }
325
326 #[test]
327 fn test_from_parameters_invalid_format() {
328 let mut params = HashMap::new();
329 params.insert("output_path".to_string(), "/tmp/test.bin".to_string());
330 params.insert("format".to_string(), "invalid".to_string());
331 let result = TeeConfig::from_parameters(¶ms);
332 assert!(result.is_err());
333 }
334
335 #[test]
336 fn test_tee_binary_format() {
337 let temp_dir = TempDir::new().unwrap();
338 let output_path = temp_dir.path().join("tee-binary.bin");
339
340 let service = TeeService::new();
341 let config = TeeConfig {
342 output_path: output_path.clone(),
343 format: TeeFormat::Binary,
344 enabled: true,
345 };
346
347 let test_data = b"Hello, World!";
348 service.write_tee(test_data, &config, 0).unwrap();
349
350 let contents = fs::read_to_string(&output_path).unwrap();
351 assert!(contents.contains("Chunk 0"));
352 assert!(contents.contains("Hello, World!"));
353 }
354
355 #[test]
356 fn test_tee_hex_format() {
357 let temp_dir = TempDir::new().unwrap();
358 let output_path = temp_dir.path().join("tee-hex.txt");
359
360 let service = TeeService::new();
361 let config = TeeConfig {
362 output_path: output_path.clone(),
363 format: TeeFormat::Hex,
364 enabled: true,
365 };
366
367 let test_data = b"Hello, World!";
368 service.write_tee(test_data, &config, 0).unwrap();
369
370 let contents = fs::read_to_string(&output_path).unwrap();
371 assert!(contents.contains("Chunk 0"));
372 assert!(contents.contains("48 65 6c 6c 6f")); }
374
375 #[test]
376 fn test_tee_text_format() {
377 let temp_dir = TempDir::new().unwrap();
378 let output_path = temp_dir.path().join("tee-text.txt");
379
380 let service = TeeService::new();
381 let config = TeeConfig {
382 output_path: output_path.clone(),
383 format: TeeFormat::Text,
384 enabled: true,
385 };
386
387 let test_data = b"Hello, World!";
388 service.write_tee(test_data, &config, 0).unwrap();
389
390 let contents = fs::read_to_string(&output_path).unwrap();
391 assert!(contents.contains("Chunk 0"));
392 assert!(contents.contains("Hello, World!"));
393 }
394
395 #[test]
396 fn test_tee_disabled() {
397 let temp_dir = TempDir::new().unwrap();
398 let output_path = temp_dir.path().join("tee-disabled.bin");
399
400 let service = TeeService::new();
401 let config = TeeConfig {
402 output_path: output_path.clone(),
403 format: TeeFormat::Binary,
404 enabled: false,
405 };
406
407 service.write_tee(b"test", &config, 0).unwrap();
408
409 assert!(!output_path.exists());
411 }
412
413 #[test]
414 fn test_process_chunk_pass_through() {
415 use adaptive_pipeline_domain::entities::pipeline_stage::StageConfiguration;
416
417 let temp_dir = TempDir::new().unwrap();
418 let output_path = temp_dir.path().join("tee-process.bin");
419
420 let service = TeeService::new();
421 let original_data = b"Test data for pass-through".to_vec();
422 let chunk = FileChunk::new(0, 0, original_data.clone(), false).unwrap();
423
424 let mut params = HashMap::new();
425 params.insert("output_path".to_string(), output_path.display().to_string());
426 params.insert("format".to_string(), "binary".to_string());
427
428 let config = StageConfiguration {
429 algorithm: "tee".to_string(),
430 operation: Operation::Forward,
431 parameters: params,
432 parallel_processing: false,
433 chunk_size: None,
434 };
435
436 let mut context = ProcessingContext::new(
437 100,
438 SecurityContext::new(None, SecurityLevel::Public),
439 );
440
441 let result = service.process_chunk(chunk, &config, &mut context).unwrap();
442
443 assert_eq!(result.data(), &original_data);
445
446 assert!(output_path.exists());
448 }
449
450 #[test]
451 fn test_stage_service_properties() {
452 let service = TeeService::new();
453
454 assert_eq!(service.position(), StagePosition::Any);
455 assert!(service.is_reversible());
456 assert_eq!(service.stage_type(), StageType::Transform);
457 }
458}