ddex_builder/
parallel_processing.rs

1//! Parallel processing optimizations for DDEX Builder
2//!
3//! This module provides parallel validation, resource processing, and XML generation
4//! using rayon for CPU-bound operations to achieve sub-10ms build times.
5
6use crate::ast::{Element, Node};
7use crate::builder::{BuildRequest, ReleaseRequest, TrackRequest};
8use crate::error::BuildError;
9use crate::memory_optimization::BuildMemoryManager;
10use crate::optimized_strings::BuildContext;
11use rayon::prelude::*;
12use std::sync::{Arc, Mutex};
13use std::time::Instant;
14
15/// Parallel build configuration
16#[derive(Debug, Clone)]
17pub struct ParallelConfig {
18    /// Minimum items to trigger parallel processing
19    pub parallel_threshold: usize,
20    /// Maximum number of threads to use (None = use all available)
21    pub max_threads: Option<usize>,
22    /// Whether to use parallel validation
23    pub parallel_validation: bool,
24    /// Whether to use parallel XML generation
25    pub parallel_xml_generation: bool,
26}
27
28impl Default for ParallelConfig {
29    fn default() -> Self {
30        Self {
31            parallel_threshold: 5, // Parallel processing for 5+ items
32            max_threads: None,     // Use all available cores
33            parallel_validation: true,
34            parallel_xml_generation: true,
35        }
36    }
37}
38
39/// Parallel processor for DDEX builds
40pub struct ParallelProcessor {
41    config: ParallelConfig,
42    thread_pool: rayon::ThreadPool,
43}
44
45impl ParallelProcessor {
46    /// Create a new parallel processor
47    pub fn new(config: ParallelConfig) -> Result<Self, BuildError> {
48        let thread_pool = if let Some(max_threads) = config.max_threads {
49            rayon::ThreadPoolBuilder::new()
50                .num_threads(max_threads)
51                .build()
52                .map_err(|e| BuildError::Parallel(e.to_string()))?
53        } else {
54            rayon::ThreadPoolBuilder::new()
55                .build()
56                .map_err(|e| BuildError::Parallel(e.to_string()))?
57        };
58
59        Ok(Self {
60            config,
61            thread_pool,
62        })
63    }
64
65    /// Process a build request with parallel optimizations
66    pub fn process_build_parallel(
67        &self,
68        request: &BuildRequest,
69        context: &mut BuildContext,
70        memory_manager: &BuildMemoryManager,
71    ) -> Result<ParallelBuildResult, BuildError> {
72        let start_time = Instant::now();
73
74        // Determine if we should use parallel processing
75        let total_tracks: usize = request.releases.iter().map(|r| r.tracks.len()).sum();
76
77        let use_parallel = total_tracks >= self.config.parallel_threshold;
78
79        let result = if use_parallel {
80            self.process_parallel_impl(request, context, memory_manager)?
81        } else {
82            self.process_sequential_impl(request, context, memory_manager)?
83        };
84
85        let processing_time = start_time.elapsed();
86
87        Ok(ParallelBuildResult {
88            elements: result,
89            processing_time,
90            used_parallel: use_parallel,
91            thread_count: if use_parallel {
92                self.thread_pool.current_num_threads()
93            } else {
94                1
95            },
96            total_tracks,
97        })
98    }
99
100    /// Parallel implementation for large builds
101    fn process_parallel_impl(
102        &self,
103        request: &BuildRequest,
104        _context: &mut BuildContext,
105        _memory_manager: &BuildMemoryManager,
106    ) -> Result<Vec<ProcessedElement>, BuildError> {
107        self.thread_pool.install(|| {
108            // Process releases in parallel
109            let processed_releases: Result<Vec<_>, BuildError> = request
110                .releases
111                .par_iter()
112                .map(|release| self.process_release_parallel(release))
113                .collect();
114
115            let releases = processed_releases?;
116
117            // Combine results
118            Ok(releases.into_iter().flatten().collect())
119        })
120    }
121
122    /// Sequential implementation for small builds
123    fn process_sequential_impl(
124        &self,
125        request: &BuildRequest,
126        _context: &mut BuildContext,
127        _memory_manager: &BuildMemoryManager,
128    ) -> Result<Vec<ProcessedElement>, BuildError> {
129        let mut results = Vec::new();
130
131        for release in &request.releases {
132            let processed = self.process_release_sequential(release)?;
133            results.extend(processed);
134        }
135
136        Ok(results)
137    }
138
139    /// Process a single release in parallel
140    fn process_release_parallel(
141        &self,
142        release: &ReleaseRequest,
143    ) -> Result<Vec<ProcessedElement>, BuildError> {
144        // Process tracks in parallel if there are enough of them
145        if release.tracks.len() >= self.config.parallel_threshold {
146            let processed_tracks: Result<Vec<_>, BuildError> = release
147                .tracks
148                .par_iter()
149                .map(|track| self.process_track(track))
150                .collect();
151
152            let tracks = processed_tracks?;
153
154            // Create release element
155            let release_element = ProcessedElement {
156                name: "Release".to_string(),
157                processing_time: std::time::Duration::from_nanos(1), // Minimal for structure
158                element_count: 1 + tracks.len(),
159            };
160
161            let mut result = vec![release_element];
162            result.extend(tracks);
163            Ok(result)
164        } else {
165            self.process_release_sequential(release)
166        }
167    }
168
169    /// Process a single release sequentially
170    fn process_release_sequential(
171        &self,
172        release: &ReleaseRequest,
173    ) -> Result<Vec<ProcessedElement>, BuildError> {
174        let mut results = Vec::new();
175
176        // Process release
177        let release_element = ProcessedElement {
178            name: "Release".to_string(),
179            processing_time: std::time::Duration::from_nanos(1),
180            element_count: 1,
181        };
182        results.push(release_element);
183
184        // Process tracks
185        for track in &release.tracks {
186            results.push(self.process_track(track)?);
187        }
188
189        Ok(results)
190    }
191
192    /// Process a single track
193    fn process_track(&self, track: &TrackRequest) -> Result<ProcessedElement, BuildError> {
194        let start_time = Instant::now();
195
196        // Simulate track processing work
197        // In reality, this would do validation, resource linking, etc.
198        let _validated = self.validate_track(track)?;
199
200        let processing_time = start_time.elapsed();
201
202        Ok(ProcessedElement {
203            name: format!("Track_{}", track.track_id),
204            processing_time,
205            element_count: 1,
206        })
207    }
208
209    /// Validate a track (can be called in parallel)
210    fn validate_track(&self, track: &TrackRequest) -> Result<ValidatedTrack, BuildError> {
211        // ISRC validation
212        if track.isrc.len() != 12 {
213            return Err(BuildError::Validation(format!(
214                "Invalid ISRC length for track {}: expected 12 characters, got {}",
215                track.track_id,
216                track.isrc.len()
217            )));
218        }
219
220        // Duration validation (basic ISO 8601 check)
221        if !track.duration.starts_with("PT") {
222            return Err(BuildError::Validation(format!(
223                "Invalid duration format for track {}: must start with 'PT'",
224                track.track_id
225            )));
226        }
227
228        // Title validation
229        if track.title.trim().is_empty() {
230            return Err(BuildError::Validation(format!(
231                "Track title cannot be empty for track {}",
232                track.track_id
233            )));
234        }
235
236        Ok(ValidatedTrack {
237            track_id: track.track_id.clone(),
238            isrc: track.isrc.clone(),
239            title: track.title.clone(),
240            duration: track.duration.clone(),
241            artist: track.artist.clone(),
242        })
243    }
244
245    /// Parallel XML section generation for large elements
246    pub fn generate_xml_sections_parallel(
247        &self,
248        elements: &[Element],
249        context: &Arc<Mutex<BuildContext>>,
250    ) -> Result<Vec<String>, BuildError> {
251        if elements.len() < self.config.parallel_threshold || !self.config.parallel_xml_generation {
252            return self.generate_xml_sections_sequential(elements, context);
253        }
254
255        self.thread_pool.install(|| {
256            elements
257                .par_iter()
258                .map(|element| {
259                    // Each thread gets its own temporary context to avoid contention
260                    let mut local_context = BuildContext::new();
261
262                    // Generate XML for this element
263                    self.element_to_xml_string(element, &mut local_context)
264                })
265                .collect()
266        })
267    }
268
269    /// Sequential XML section generation
270    fn generate_xml_sections_sequential(
271        &self,
272        elements: &[Element],
273        context: &Arc<Mutex<BuildContext>>,
274    ) -> Result<Vec<String>, BuildError> {
275        let mut results = Vec::with_capacity(elements.len());
276
277        for element in elements {
278            let mut context = context.lock().unwrap();
279            let xml = self.element_to_xml_string(element, &mut context)?;
280            results.push(xml);
281        }
282
283        Ok(results)
284    }
285
286    /// Convert element to XML string (simplified for example)
287    fn element_to_xml_string(
288        &self,
289        element: &Element,
290        context: &mut BuildContext,
291    ) -> Result<String, BuildError> {
292        // Get buffer from context
293        let mut buffer = context.get_xml_buffer(256);
294
295        buffer.push('<');
296        buffer.push_str(&element.name);
297
298        // Add attributes
299        for (key, value) in &element.attributes {
300            buffer.push_str(&format!(" {}=\"{}\"", key, value));
301        }
302
303        if element.children.is_empty() {
304            buffer.push_str("/>");
305        } else {
306            buffer.push('>');
307
308            // Handle children (simplified)
309            for child in &element.children {
310                match child {
311                    Node::Text(text) => buffer.push_str(text),
312                    Node::Element(child_element) => {
313                        let child_xml = self.element_to_xml_string(child_element, context)?;
314                        buffer.push_str(&child_xml);
315                    }
316                    Node::Comment(comment) => {
317                        buffer.push_str(&comment.to_xml());
318                    }
319                    Node::SimpleComment(comment) => {
320                        buffer.push_str(&format!("<!-- {} -->", comment));
321                    }
322                }
323            }
324
325            buffer.push_str(&format!("</{}>", element.name));
326        }
327
328        let result = buffer.clone();
329        context.return_xml_buffer(buffer);
330
331        Ok(result)
332    }
333
334    /// Parallel validation of multiple items
335    pub fn validate_items_parallel<T, F>(
336        &self,
337        items: &[T],
338        validator: F,
339    ) -> Result<Vec<ValidationResult>, BuildError>
340    where
341        T: Send + Sync,
342        F: Fn(&T) -> Result<(), BuildError> + Send + Sync,
343    {
344        if items.len() < self.config.parallel_threshold || !self.config.parallel_validation {
345            return self.validate_items_sequential(items, validator);
346        }
347
348        let validation_results: Vec<ValidationResult> = self.thread_pool.install(|| {
349            items
350                .par_iter()
351                .map(|item| {
352                    let start_time = Instant::now();
353                    let result = validator(item);
354                    let processing_time = start_time.elapsed();
355
356                    ValidationResult {
357                        success: result.is_ok(),
358                        error: result.err(),
359                        processing_time,
360                    }
361                })
362                .collect()
363        });
364
365        // Check for errors
366        for result in &validation_results {
367            if !result.success {
368                if let Some(ref err) = result.error {
369                    return Err(err.clone());
370                }
371            }
372        }
373
374        Ok(validation_results)
375    }
376
377    /// Sequential validation fallback
378    fn validate_items_sequential<T, F>(
379        &self,
380        items: &[T],
381        validator: F,
382    ) -> Result<Vec<ValidationResult>, BuildError>
383    where
384        F: Fn(&T) -> Result<(), BuildError>,
385    {
386        let mut results = Vec::with_capacity(items.len());
387
388        for item in items {
389            let start_time = Instant::now();
390            let result = validator(item);
391            let processing_time = start_time.elapsed();
392
393            results.push(ValidationResult {
394                success: result.is_ok(),
395                error: result.err(),
396                processing_time,
397            });
398        }
399
400        Ok(results)
401    }
402}
403
404/// Result of parallel processing
405#[derive(Debug)]
406pub struct ParallelBuildResult {
407    /// Processed elements
408    pub elements: Vec<ProcessedElement>,
409    /// Total processing time
410    pub processing_time: std::time::Duration,
411    /// Whether parallel processing was used
412    pub used_parallel: bool,
413    /// Number of threads used
414    pub thread_count: usize,
415    /// Total tracks processed
416    pub total_tracks: usize,
417}
418
419impl ParallelBuildResult {
420    /// Check if build met performance targets
421    pub fn meets_performance_target(&self) -> bool {
422        match self.total_tracks {
423            1 => self.processing_time.as_millis() < 5, // Single track: <5ms
424            2..=20 => self.processing_time.as_millis() < 10, // Album: <10ms
425            _ => self.processing_time.as_millis() < 50, // Large: <50ms
426        }
427    }
428
429    /// Get performance summary
430    pub fn performance_summary(&self) -> String {
431        format!(
432            "Processed {} tracks in {:.2}ms using {} thread(s) ({}parallel)",
433            self.total_tracks,
434            self.processing_time.as_millis(),
435            self.thread_count,
436            if self.used_parallel { "" } else { "non-" }
437        )
438    }
439}
440
441/// Thread performance metrics
442#[derive(Debug)]
443pub struct ProcessedElement {
444    /// Thread name/ID
445    pub name: String,
446    /// Time spent processing
447    pub processing_time: std::time::Duration,
448    /// Number of elements processed
449    pub element_count: usize,
450}
451
452/// Thread performance metrics
453#[derive(Debug)]
454pub struct ThreadMetrics {
455    /// Thread name/ID
456    pub name: String,
457    /// Time spent processing
458    pub processing_time: std::time::Duration,
459    /// Number of elements processed
460    pub element_count: usize,
461}
462
463/// Validated track data
464#[derive(Debug)]
465#[allow(dead_code)]
466struct ValidatedTrack {
467    pub track_id: String,
468    pub isrc: String,
469    pub title: String,
470    pub duration: String,
471    pub artist: String,
472}
473
474/// Result of parallel build operation
475#[derive(Debug)]
476pub struct ValidationResult {
477    /// Whether the build succeeded
478    pub success: bool,
479    /// Error if build failed
480    pub error: Option<BuildError>,
481    /// Time taken for processing
482    pub processing_time: std::time::Duration,
483}
484
485/// Workload analyzer to determine optimal parallel strategy
486pub struct WorkloadAnalyzer;
487
488impl WorkloadAnalyzer {
489    /// Analyze a build request and suggest parallel configuration
490    pub fn analyze_workload(request: &BuildRequest) -> WorkloadAnalysis {
491        let total_releases = request.releases.len();
492        let total_tracks: usize = request.releases.iter().map(|r| r.tracks.len()).sum();
493
494        let max_tracks_per_release = request
495            .releases
496            .iter()
497            .map(|r| r.tracks.len())
498            .max()
499            .unwrap_or(0);
500
501        let complexity_score = Self::calculate_complexity_score(request);
502
503        WorkloadAnalysis {
504            total_releases,
505            total_tracks,
506            max_tracks_per_release,
507            complexity_score,
508            recommended_config: Self::recommend_config(total_tracks, complexity_score),
509        }
510    }
511
512    /// Calculate complexity score for the build
513    fn calculate_complexity_score(request: &BuildRequest) -> f32 {
514        let mut score = 0.0;
515
516        // Base score from track count
517        let total_tracks: usize = request.releases.iter().map(|r| r.tracks.len()).sum();
518        score += total_tracks as f32 * 1.0;
519
520        // Add complexity for multiple releases
521        score += request.releases.len() as f32 * 0.5;
522
523        // Add complexity for deals
524        score += request.deals.len() as f32 * 2.0; // Deals are more complex
525
526        // Add complexity for extensions
527        if request.extensions.is_some() {
528            score += 1.0;
529        }
530
531        score
532    }
533
534    /// Recommend parallel configuration based on workload
535    fn recommend_config(total_tracks: usize, complexity_score: f32) -> ParallelConfig {
536        let parallel_threshold = if complexity_score > 20.0 {
537            3 // Lower threshold for complex builds
538        } else if total_tracks > 50 {
539            5 // Higher threshold for simple large builds
540        } else {
541            10 // Even higher threshold for small builds
542        };
543
544        let max_threads = if total_tracks > 100 {
545            None // Use all available cores for very large builds
546        } else if total_tracks > 20 {
547            Some(num_cpus::get().min(4)) // Cap at 4 cores for medium builds
548        } else {
549            Some(2) // Only 2 cores for small builds
550        };
551
552        ParallelConfig {
553            parallel_threshold,
554            max_threads,
555            parallel_validation: complexity_score > 10.0,
556            parallel_xml_generation: total_tracks > 15,
557        }
558    }
559}
560
561/// Complexity analysis result
562#[derive(Debug)]
563pub struct WorkloadAnalysis {
564    /// Total number of releases
565    pub total_releases: usize,
566    /// Total number of tracks across all releases
567    pub total_tracks: usize,
568    /// Maximum tracks in a single release
569    pub max_tracks_per_release: usize,
570    /// Computed complexity score (0.0-1.0)
571    pub complexity_score: f32,
572    /// Recommended parallel configuration
573    pub recommended_config: ParallelConfig,
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579    use crate::builder::{LocalizedStringRequest, MessageHeaderRequest, PartyRequest};
580
581    #[test]
582    fn test_parallel_processor_creation() {
583        let config = ParallelConfig::default();
584        let processor = ParallelProcessor::new(config);
585        assert!(processor.is_ok());
586    }
587
588    #[test]
589    fn test_workload_analysis() {
590        let request = BuildRequest {
591            header: MessageHeaderRequest {
592                message_id: Some("TEST_001".to_string()),
593                message_sender: PartyRequest {
594                    party_name: vec![LocalizedStringRequest {
595                        text: "Test Sender".to_string(),
596                        language_code: None,
597                    }],
598                    party_id: Some("SENDER_001".to_string()),
599                    party_reference: None,
600                },
601                message_recipient: PartyRequest {
602                    party_name: vec![LocalizedStringRequest {
603                        text: "Test Recipient".to_string(),
604                        language_code: None,
605                    }],
606                    party_id: Some("RECIPIENT_001".to_string()),
607                    party_reference: None,
608                },
609                message_control_type: None,
610                message_created_date_time: None,
611            },
612            version: "4.3".to_string(),
613            profile: None,
614            releases: vec![],
615            deals: vec![],
616            extensions: None,
617        };
618
619        let analysis = WorkloadAnalyzer::analyze_workload(&request);
620        assert_eq!(analysis.total_tracks, 0);
621        assert_eq!(analysis.total_releases, 0);
622    }
623
624    #[test]
625    fn test_track_validation() {
626        let config = ParallelConfig::default();
627        let processor = ParallelProcessor::new(config).unwrap();
628
629        let valid_track = TrackRequest {
630            track_id: "T001".to_string(),
631            resource_reference: Some("A001".to_string()),
632            isrc: "USRC17607839".to_string(), // 12 chars
633            title: "Test Track".to_string(),
634            duration: "PT3M30S".to_string(),
635            artist: "Test Artist".to_string(),
636        };
637
638        let result = processor.validate_track(&valid_track);
639        assert!(result.is_ok());
640
641        let invalid_track = TrackRequest {
642            track_id: "T002".to_string(),
643            resource_reference: None,
644            isrc: "INVALID".to_string(),  // Too short
645            title: "".to_string(),        // Empty
646            duration: "3:30".to_string(), // Wrong format
647            artist: "Test Artist".to_string(),
648        };
649
650        let result = processor.validate_track(&invalid_track);
651        assert!(result.is_err());
652    }
653
654    #[test]
655    fn test_performance_target_checking() {
656        let result = ParallelBuildResult {
657            elements: vec![],
658            processing_time: std::time::Duration::from_millis(3),
659            used_parallel: false,
660            thread_count: 1,
661            total_tracks: 1,
662        };
663
664        assert!(result.meets_performance_target()); // 3ms < 5ms target for single track
665
666        let slow_result = ParallelBuildResult {
667            elements: vec![],
668            processing_time: std::time::Duration::from_millis(15),
669            used_parallel: true,
670            thread_count: 4,
671            total_tracks: 12,
672        };
673
674        assert!(!slow_result.meets_performance_target()); // 15ms > 10ms target for album
675    }
676}