ddex_builder/
parallel_processing.rs

1//! Parallel processing optimizations for DDEX Builder
2//! 
3//! This module provides parallel validation, resource processing, and XML generation
4//! using rayon for CPU-bound operations to achieve sub-10ms build times.
5
6use crate::ast::{Element, Node};
7use crate::builder::{BuildRequest, ReleaseRequest, TrackRequest};
8use crate::error::BuildError;
9use crate::optimized_strings::BuildContext;
10use crate::memory_optimization::BuildMemoryManager;
11use indexmap::IndexMap;
12use rayon::prelude::*;
13use std::sync::{Arc, Mutex};
14use std::time::Instant;
15
16/// Parallel build configuration
17#[derive(Debug, Clone)]
18pub struct ParallelConfig {
19    /// Minimum items to trigger parallel processing
20    pub parallel_threshold: usize,
21    /// Maximum number of threads to use (None = use all available)
22    pub max_threads: Option<usize>,
23    /// Whether to use parallel validation
24    pub parallel_validation: bool,
25    /// Whether to use parallel XML generation
26    pub parallel_xml_generation: bool,
27}
28
29impl Default for ParallelConfig {
30    fn default() -> Self {
31        Self {
32            parallel_threshold: 5, // Parallel processing for 5+ items
33            max_threads: None,     // Use all available cores
34            parallel_validation: true,
35            parallel_xml_generation: true,
36        }
37    }
38}
39
40/// Parallel processor for DDEX builds
41pub struct ParallelProcessor {
42    config: ParallelConfig,
43    thread_pool: rayon::ThreadPool,
44}
45
46impl ParallelProcessor {
47    /// Create a new parallel processor
48    pub fn new(config: ParallelConfig) -> Result<Self, BuildError> {
49        let thread_pool = if let Some(max_threads) = config.max_threads {
50            rayon::ThreadPoolBuilder::new()
51                .num_threads(max_threads)
52                .build()
53                .map_err(|e| BuildError::Parallel(e.to_string()))?
54        } else {
55            rayon::ThreadPoolBuilder::new()
56                .build()
57                .map_err(|e| BuildError::Parallel(e.to_string()))?
58        };
59        
60        Ok(Self {
61            config,
62            thread_pool,
63        })
64    }
65    
66    /// Process a build request with parallel optimizations
67    pub fn process_build_parallel(
68        &self,
69        request: &BuildRequest,
70        context: &mut BuildContext,
71        memory_manager: &BuildMemoryManager,
72    ) -> Result<ParallelBuildResult, BuildError> {
73        let start_time = Instant::now();
74        
75        // Determine if we should use parallel processing
76        let total_tracks: usize = request.releases.iter()
77            .map(|r| r.tracks.len())
78            .sum();
79        
80        let use_parallel = total_tracks >= self.config.parallel_threshold;
81        
82        let result = if use_parallel {
83            self.process_parallel_impl(request, context, memory_manager)?
84        } else {
85            self.process_sequential_impl(request, context, memory_manager)?
86        };
87        
88        let processing_time = start_time.elapsed();
89        
90        Ok(ParallelBuildResult {
91            elements: result,
92            processing_time,
93            used_parallel: use_parallel,
94            thread_count: if use_parallel { 
95                self.thread_pool.current_num_threads() 
96            } else { 
97                1 
98            },
99            total_tracks,
100        })
101    }
102    
103    /// Parallel implementation for large builds
104    fn process_parallel_impl(
105        &self,
106        request: &BuildRequest,
107        _context: &mut BuildContext,
108        _memory_manager: &BuildMemoryManager,
109    ) -> Result<Vec<ProcessedElement>, BuildError> {
110        self.thread_pool.install(|| {
111            // Process releases in parallel
112            let processed_releases: Result<Vec<_>, BuildError> = request.releases
113                .par_iter()
114                .map(|release| self.process_release_parallel(release))
115                .collect();
116            
117            let releases = processed_releases?;
118            
119            // Combine results
120            Ok(releases.into_iter().flatten().collect())
121        })
122    }
123    
124    /// Sequential implementation for small builds
125    fn process_sequential_impl(
126        &self,
127        request: &BuildRequest,
128        _context: &mut BuildContext,
129        _memory_manager: &BuildMemoryManager,
130    ) -> Result<Vec<ProcessedElement>, BuildError> {
131        let mut results = Vec::new();
132        
133        for release in &request.releases {
134            let processed = self.process_release_sequential(release)?;
135            results.extend(processed);
136        }
137        
138        Ok(results)
139    }
140    
141    /// Process a single release in parallel
142    fn process_release_parallel(&self, release: &ReleaseRequest) -> Result<Vec<ProcessedElement>, BuildError> {
143        // Process tracks in parallel if there are enough of them
144        if release.tracks.len() >= self.config.parallel_threshold {
145            let processed_tracks: Result<Vec<_>, BuildError> = release.tracks
146                .par_iter()
147                .map(|track| self.process_track(track))
148                .collect();
149            
150            let tracks = processed_tracks?;
151            
152            // Create release element
153            let release_element = ProcessedElement {
154                name: "Release".to_string(),
155                processing_time: std::time::Duration::from_nanos(1), // Minimal for structure
156                element_count: 1 + tracks.len(),
157            };
158            
159            let mut result = vec![release_element];
160            result.extend(tracks);
161            Ok(result)
162        } else {
163            self.process_release_sequential(release)
164        }
165    }
166    
167    /// Process a single release sequentially
168    fn process_release_sequential(&self, release: &ReleaseRequest) -> Result<Vec<ProcessedElement>, BuildError> {
169        let mut results = Vec::new();
170        
171        // Process release
172        let release_element = ProcessedElement {
173            name: "Release".to_string(),
174            processing_time: std::time::Duration::from_nanos(1),
175            element_count: 1,
176        };
177        results.push(release_element);
178        
179        // Process tracks
180        for track in &release.tracks {
181            results.push(self.process_track(track)?);
182        }
183        
184        Ok(results)
185    }
186    
187    /// Process a single track
188    fn process_track(&self, track: &TrackRequest) -> Result<ProcessedElement, BuildError> {
189        let start_time = Instant::now();
190        
191        // Simulate track processing work
192        // In reality, this would do validation, resource linking, etc.
193        let _validated = self.validate_track(track)?;
194        
195        let processing_time = start_time.elapsed();
196        
197        Ok(ProcessedElement {
198            name: format!("Track_{}", track.track_id),
199            processing_time,
200            element_count: 1,
201        })
202    }
203    
204    /// Validate a track (can be called in parallel)
205    fn validate_track(&self, track: &TrackRequest) -> Result<ValidatedTrack, BuildError> {
206        // ISRC validation
207        if track.isrc.len() != 12 {
208            return Err(BuildError::Validation(format!(
209                "Invalid ISRC length for track {}: expected 12 characters, got {}", 
210                track.track_id, 
211                track.isrc.len()
212            )));
213        }
214        
215        // Duration validation (basic ISO 8601 check)
216        if !track.duration.starts_with("PT") {
217            return Err(BuildError::Validation(format!(
218                "Invalid duration format for track {}: must start with 'PT'", 
219                track.track_id
220            )));
221        }
222        
223        // Title validation
224        if track.title.trim().is_empty() {
225            return Err(BuildError::Validation(format!(
226                "Track title cannot be empty for track {}", 
227                track.track_id
228            )));
229        }
230        
231        Ok(ValidatedTrack {
232            track_id: track.track_id.clone(),
233            isrc: track.isrc.clone(),
234            title: track.title.clone(),
235            duration: track.duration.clone(),
236            artist: track.artist.clone(),
237        })
238    }
239    
240    /// Parallel XML section generation for large elements
241    pub fn generate_xml_sections_parallel(
242        &self,
243        elements: &[Element],
244        context: &Arc<Mutex<BuildContext>>,
245    ) -> Result<Vec<String>, BuildError> {
246        if elements.len() < self.config.parallel_threshold || !self.config.parallel_xml_generation {
247            return self.generate_xml_sections_sequential(elements, context);
248        }
249        
250        self.thread_pool.install(|| {
251            elements.par_iter()
252                .map(|element| {
253                    // Each thread gets its own temporary context to avoid contention
254                    let mut local_context = BuildContext::new();
255                    
256                    // Generate XML for this element
257                    self.element_to_xml_string(element, &mut local_context)
258                })
259                .collect()
260        })
261    }
262    
263    /// Sequential XML section generation
264    fn generate_xml_sections_sequential(
265        &self,
266        elements: &[Element],
267        context: &Arc<Mutex<BuildContext>>,
268    ) -> Result<Vec<String>, BuildError> {
269        let mut results = Vec::with_capacity(elements.len());
270        
271        for element in elements {
272            let mut context = context.lock().unwrap();
273            let xml = self.element_to_xml_string(element, &mut context)?;
274            results.push(xml);
275        }
276        
277        Ok(results)
278    }
279    
280    /// Convert element to XML string (simplified for example)
281    fn element_to_xml_string(&self, element: &Element, context: &mut BuildContext) -> Result<String, BuildError> {
282        // Get buffer from context
283        let mut buffer = context.get_xml_buffer(256);
284        
285        buffer.push('<');
286        buffer.push_str(&element.name);
287        
288        // Add attributes
289        for (key, value) in &element.attributes {
290            buffer.push_str(&format!(" {}=\"{}\"", key, value));
291        }
292        
293        if element.children.is_empty() {
294            buffer.push_str("/>");
295        } else {
296            buffer.push('>');
297            
298            // Handle children (simplified)
299            for child in &element.children {
300                match child {
301                    Node::Text(text) => buffer.push_str(text),
302                    Node::Element(child_element) => {
303                        let child_xml = self.element_to_xml_string(child_element, context)?;
304                        buffer.push_str(&child_xml);
305                    }
306                    Node::Comment(comment) => {
307                        buffer.push_str(&comment.to_xml());
308                    }
309                    Node::SimpleComment(comment) => {
310                        buffer.push_str(&format!("<!-- {} -->", comment));
311                    }
312                }
313            }
314            
315            buffer.push_str(&format!("</{}>", element.name));
316        }
317        
318        let result = buffer.clone();
319        context.return_xml_buffer(buffer);
320        
321        Ok(result)
322    }
323    
324    /// Parallel validation of multiple items
325    pub fn validate_items_parallel<T, F>(
326        &self,
327        items: &[T],
328        validator: F,
329    ) -> Result<Vec<ValidationResult>, BuildError>
330    where
331        T: Send + Sync,
332        F: Fn(&T) -> Result<(), BuildError> + Send + Sync,
333    {
334        if items.len() < self.config.parallel_threshold || !self.config.parallel_validation {
335            return self.validate_items_sequential(items, validator);
336        }
337        
338        let validation_results: Vec<ValidationResult> = self.thread_pool.install(|| {
339            items.par_iter()
340                .map(|item| {
341                    let start_time = Instant::now();
342                    let result = validator(item);
343                    let processing_time = start_time.elapsed();
344                    
345                    ValidationResult {
346                        success: result.is_ok(),
347                        error: result.err(),
348                        processing_time,
349                    }
350                })
351                .collect()
352        });
353        
354        // Check for errors
355        for result in &validation_results {
356            if !result.success {
357                if let Some(ref err) = result.error {
358                    return Err(err.clone());
359                }
360            }
361        }
362        
363        Ok(validation_results)
364    }
365    
366    /// Sequential validation fallback
367    fn validate_items_sequential<T, F>(
368        &self,
369        items: &[T],
370        validator: F,
371    ) -> Result<Vec<ValidationResult>, BuildError>
372    where
373        F: Fn(&T) -> Result<(), BuildError>,
374    {
375        let mut results = Vec::with_capacity(items.len());
376        
377        for item in items {
378            let start_time = Instant::now();
379            let result = validator(item);
380            let processing_time = start_time.elapsed();
381            
382            results.push(ValidationResult {
383                success: result.is_ok(),
384                error: result.err(),
385                processing_time,
386            });
387        }
388        
389        Ok(results)
390    }
391}
392
393/// Result of parallel build processing
394#[derive(Debug)]
395pub struct ParallelBuildResult {
396    pub elements: Vec<ProcessedElement>,
397    pub processing_time: std::time::Duration,
398    pub used_parallel: bool,
399    pub thread_count: usize,
400    pub total_tracks: usize,
401}
402
403impl ParallelBuildResult {
404    /// Check if build met performance targets
405    pub fn meets_performance_target(&self) -> bool {
406        match self.total_tracks {
407            1 => self.processing_time.as_millis() < 5,      // Single track: <5ms
408            2..=20 => self.processing_time.as_millis() < 10, // Album: <10ms
409            _ => self.processing_time.as_millis() < 50,      // Large: <50ms
410        }
411    }
412    
413    /// Get performance summary
414    pub fn performance_summary(&self) -> String {
415        format!(
416            "Processed {} tracks in {:.2}ms using {} thread(s) ({}parallel)",
417            self.total_tracks,
418            self.processing_time.as_millis(),
419            self.thread_count,
420            if self.used_parallel { "" } else { "non-" }
421        )
422    }
423}
424
425/// Processed element information
426#[derive(Debug)]
427pub struct ProcessedElement {
428    pub name: String,
429    pub processing_time: std::time::Duration,
430    pub element_count: usize,
431}
432
433/// Validated track data
434#[derive(Debug)]
435struct ValidatedTrack {
436    pub track_id: String,
437    pub isrc: String,
438    pub title: String,
439    pub duration: String,
440    pub artist: String,
441}
442
443/// Validation result
444#[derive(Debug)]
445pub struct ValidationResult {
446    pub success: bool,
447    pub error: Option<BuildError>,
448    pub processing_time: std::time::Duration,
449}
450
451/// Workload analyzer to determine optimal parallel strategy
452pub struct WorkloadAnalyzer;
453
454impl WorkloadAnalyzer {
455    /// Analyze a build request and suggest parallel configuration
456    pub fn analyze_workload(request: &BuildRequest) -> WorkloadAnalysis {
457        let total_releases = request.releases.len();
458        let total_tracks: usize = request.releases.iter()
459            .map(|r| r.tracks.len())
460            .sum();
461        
462        let max_tracks_per_release = request.releases.iter()
463            .map(|r| r.tracks.len())
464            .max()
465            .unwrap_or(0);
466        
467        let complexity_score = Self::calculate_complexity_score(request);
468        
469        WorkloadAnalysis {
470            total_releases,
471            total_tracks,
472            max_tracks_per_release,
473            complexity_score,
474            recommended_config: Self::recommend_config(total_tracks, complexity_score),
475        }
476    }
477    
478    /// Calculate complexity score for the build
479    fn calculate_complexity_score(request: &BuildRequest) -> f32 {
480        let mut score = 0.0;
481        
482        // Base score from track count
483        let total_tracks: usize = request.releases.iter()
484            .map(|r| r.tracks.len())
485            .sum();
486        score += total_tracks as f32 * 1.0;
487        
488        // Add complexity for multiple releases
489        score += request.releases.len() as f32 * 0.5;
490        
491        // Add complexity for deals
492        score += request.deals.len() as f32 * 2.0; // Deals are more complex
493        
494        // Add complexity for extensions
495        if request.extensions.is_some() {
496            score += 1.0;
497        }
498        
499        score
500    }
501    
502    /// Recommend parallel configuration based on workload
503    fn recommend_config(total_tracks: usize, complexity_score: f32) -> ParallelConfig {
504        let parallel_threshold = if complexity_score > 20.0 {
505            3 // Lower threshold for complex builds
506        } else if total_tracks > 50 {
507            5 // Higher threshold for simple large builds
508        } else {
509            10 // Even higher threshold for small builds
510        };
511        
512        let max_threads = if total_tracks > 100 {
513            None // Use all available cores for very large builds
514        } else if total_tracks > 20 {
515            Some(num_cpus::get().min(4)) // Cap at 4 cores for medium builds
516        } else {
517            Some(2) // Only 2 cores for small builds
518        };
519        
520        ParallelConfig {
521            parallel_threshold,
522            max_threads,
523            parallel_validation: complexity_score > 10.0,
524            parallel_xml_generation: total_tracks > 15,
525        }
526    }
527}
528
529/// Workload analysis result
530#[derive(Debug)]
531pub struct WorkloadAnalysis {
532    pub total_releases: usize,
533    pub total_tracks: usize,
534    pub max_tracks_per_release: usize,
535    pub complexity_score: f32,
536    pub recommended_config: ParallelConfig,
537}
538
539#[cfg(test)]
540mod tests {
541    use super::*;
542    use crate::builder::{LocalizedStringRequest, PartyRequest, MessageHeaderRequest};
543    
544    #[test]
545    fn test_parallel_processor_creation() {
546        let config = ParallelConfig::default();
547        let processor = ParallelProcessor::new(config);
548        assert!(processor.is_ok());
549    }
550    
551    #[test]
552    fn test_workload_analysis() {
553        let request = BuildRequest {
554            header: MessageHeaderRequest {
555                message_id: Some("TEST_001".to_string()),
556                message_sender: PartyRequest {
557                    party_name: vec![LocalizedStringRequest {
558                        text: "Test Sender".to_string(),
559                        language_code: None,
560                    }],
561                    party_id: Some("SENDER_001".to_string()),
562                    party_reference: None,
563                },
564                message_recipient: PartyRequest {
565                    party_name: vec![LocalizedStringRequest {
566                        text: "Test Recipient".to_string(),
567                        language_code: None,
568                    }],
569                    party_id: Some("RECIPIENT_001".to_string()),
570                    party_reference: None,
571                },
572                message_control_type: None,
573                message_created_date_time: None,
574            },
575            version: "4.3".to_string(),
576            profile: None,
577            releases: vec![],
578            deals: vec![],
579            extensions: None,
580        };
581        
582        let analysis = WorkloadAnalyzer::analyze_workload(&request);
583        assert_eq!(analysis.total_tracks, 0);
584        assert_eq!(analysis.total_releases, 0);
585    }
586    
587    #[test]
588    fn test_track_validation() {
589        let config = ParallelConfig::default();
590        let processor = ParallelProcessor::new(config).unwrap();
591        
592        let valid_track = TrackRequest {
593            track_id: "T001".to_string(),
594            resource_reference: Some("A001".to_string()),
595            isrc: "USRC17607839".to_string(), // 12 chars
596            title: "Test Track".to_string(),
597            duration: "PT3M30S".to_string(),
598            artist: "Test Artist".to_string(),
599        };
600        
601        let result = processor.validate_track(&valid_track);
602        assert!(result.is_ok());
603        
604        let invalid_track = TrackRequest {
605            track_id: "T002".to_string(),
606            resource_reference: None,
607            isrc: "INVALID".to_string(), // Too short
608            title: "".to_string(), // Empty
609            duration: "3:30".to_string(), // Wrong format
610            artist: "Test Artist".to_string(),
611        };
612        
613        let result = processor.validate_track(&invalid_track);
614        assert!(result.is_err());
615    }
616    
617    #[test]
618    fn test_performance_target_checking() {
619        let result = ParallelBuildResult {
620            elements: vec![],
621            processing_time: std::time::Duration::from_millis(3),
622            used_parallel: false,
623            thread_count: 1,
624            total_tracks: 1,
625        };
626        
627        assert!(result.meets_performance_target()); // 3ms < 5ms target for single track
628        
629        let slow_result = ParallelBuildResult {
630            elements: vec![],
631            processing_time: std::time::Duration::from_millis(15),
632            used_parallel: true,
633            thread_count: 4,
634            total_tracks: 12,
635        };
636        
637        assert!(!slow_result.meets_performance_target()); // 15ms > 10ms target for album
638    }
639}