1use crate::ast::{Element, Node};
7use crate::builder::{BuildRequest, ReleaseRequest, TrackRequest};
8use crate::error::BuildError;
9use crate::memory_optimization::BuildMemoryManager;
10use crate::optimized_strings::BuildContext;
11use rayon::prelude::*;
12use std::sync::{Arc, Mutex};
13use std::time::Instant;
14
15#[derive(Debug, Clone)]
17pub struct ParallelConfig {
18 pub parallel_threshold: usize,
20 pub max_threads: Option<usize>,
22 pub parallel_validation: bool,
24 pub parallel_xml_generation: bool,
26}
27
28impl Default for ParallelConfig {
29 fn default() -> Self {
30 Self {
31 parallel_threshold: 5, max_threads: None, parallel_validation: true,
34 parallel_xml_generation: true,
35 }
36 }
37}
38
39pub struct ParallelProcessor {
41 config: ParallelConfig,
42 thread_pool: rayon::ThreadPool,
43}
44
45impl ParallelProcessor {
46 pub fn new(config: ParallelConfig) -> Result<Self, BuildError> {
48 let thread_pool = if let Some(max_threads) = config.max_threads {
49 rayon::ThreadPoolBuilder::new()
50 .num_threads(max_threads)
51 .build()
52 .map_err(|e| BuildError::Parallel(e.to_string()))?
53 } else {
54 rayon::ThreadPoolBuilder::new()
55 .build()
56 .map_err(|e| BuildError::Parallel(e.to_string()))?
57 };
58
59 Ok(Self {
60 config,
61 thread_pool,
62 })
63 }
64
65 pub fn process_build_parallel(
67 &self,
68 request: &BuildRequest,
69 context: &mut BuildContext,
70 memory_manager: &BuildMemoryManager,
71 ) -> Result<ParallelBuildResult, BuildError> {
72 let start_time = Instant::now();
73
74 let total_tracks: usize = request.releases.iter().map(|r| r.tracks.len()).sum();
76
77 let use_parallel = total_tracks >= self.config.parallel_threshold;
78
79 let result = if use_parallel {
80 self.process_parallel_impl(request, context, memory_manager)?
81 } else {
82 self.process_sequential_impl(request, context, memory_manager)?
83 };
84
85 let processing_time = start_time.elapsed();
86
87 Ok(ParallelBuildResult {
88 elements: result,
89 processing_time,
90 used_parallel: use_parallel,
91 thread_count: if use_parallel {
92 self.thread_pool.current_num_threads()
93 } else {
94 1
95 },
96 total_tracks,
97 })
98 }
99
100 fn process_parallel_impl(
102 &self,
103 request: &BuildRequest,
104 _context: &mut BuildContext,
105 _memory_manager: &BuildMemoryManager,
106 ) -> Result<Vec<ProcessedElement>, BuildError> {
107 self.thread_pool.install(|| {
108 let processed_releases: Result<Vec<_>, BuildError> = request
110 .releases
111 .par_iter()
112 .map(|release| self.process_release_parallel(release))
113 .collect();
114
115 let releases = processed_releases?;
116
117 Ok(releases.into_iter().flatten().collect())
119 })
120 }
121
122 fn process_sequential_impl(
124 &self,
125 request: &BuildRequest,
126 _context: &mut BuildContext,
127 _memory_manager: &BuildMemoryManager,
128 ) -> Result<Vec<ProcessedElement>, BuildError> {
129 let mut results = Vec::new();
130
131 for release in &request.releases {
132 let processed = self.process_release_sequential(release)?;
133 results.extend(processed);
134 }
135
136 Ok(results)
137 }
138
139 fn process_release_parallel(
141 &self,
142 release: &ReleaseRequest,
143 ) -> Result<Vec<ProcessedElement>, BuildError> {
144 if release.tracks.len() >= self.config.parallel_threshold {
146 let processed_tracks: Result<Vec<_>, BuildError> = release
147 .tracks
148 .par_iter()
149 .map(|track| self.process_track(track))
150 .collect();
151
152 let tracks = processed_tracks?;
153
154 let release_element = ProcessedElement {
156 name: "Release".to_string(),
157 processing_time: std::time::Duration::from_nanos(1), element_count: 1 + tracks.len(),
159 };
160
161 let mut result = vec![release_element];
162 result.extend(tracks);
163 Ok(result)
164 } else {
165 self.process_release_sequential(release)
166 }
167 }
168
169 fn process_release_sequential(
171 &self,
172 release: &ReleaseRequest,
173 ) -> Result<Vec<ProcessedElement>, BuildError> {
174 let mut results = Vec::new();
175
176 let release_element = ProcessedElement {
178 name: "Release".to_string(),
179 processing_time: std::time::Duration::from_nanos(1),
180 element_count: 1,
181 };
182 results.push(release_element);
183
184 for track in &release.tracks {
186 results.push(self.process_track(track)?);
187 }
188
189 Ok(results)
190 }
191
192 fn process_track(&self, track: &TrackRequest) -> Result<ProcessedElement, BuildError> {
194 let start_time = Instant::now();
195
196 let _validated = self.validate_track(track)?;
199
200 let processing_time = start_time.elapsed();
201
202 Ok(ProcessedElement {
203 name: format!("Track_{}", track.track_id),
204 processing_time,
205 element_count: 1,
206 })
207 }
208
209 fn validate_track(&self, track: &TrackRequest) -> Result<ValidatedTrack, BuildError> {
211 if track.isrc.len() != 12 {
213 return Err(BuildError::Validation(format!(
214 "Invalid ISRC length for track {}: expected 12 characters, got {}",
215 track.track_id,
216 track.isrc.len()
217 )));
218 }
219
220 if !track.duration.starts_with("PT") {
222 return Err(BuildError::Validation(format!(
223 "Invalid duration format for track {}: must start with 'PT'",
224 track.track_id
225 )));
226 }
227
228 if track.title.trim().is_empty() {
230 return Err(BuildError::Validation(format!(
231 "Track title cannot be empty for track {}",
232 track.track_id
233 )));
234 }
235
236 Ok(ValidatedTrack {
237 track_id: track.track_id.clone(),
238 isrc: track.isrc.clone(),
239 title: track.title.clone(),
240 duration: track.duration.clone(),
241 artist: track.artist.clone(),
242 })
243 }
244
245 pub fn generate_xml_sections_parallel(
247 &self,
248 elements: &[Element],
249 context: &Arc<Mutex<BuildContext>>,
250 ) -> Result<Vec<String>, BuildError> {
251 if elements.len() < self.config.parallel_threshold || !self.config.parallel_xml_generation {
252 return self.generate_xml_sections_sequential(elements, context);
253 }
254
255 self.thread_pool.install(|| {
256 elements
257 .par_iter()
258 .map(|element| {
259 let mut local_context = BuildContext::new();
261
262 self.element_to_xml_string(element, &mut local_context)
264 })
265 .collect()
266 })
267 }
268
269 fn generate_xml_sections_sequential(
271 &self,
272 elements: &[Element],
273 context: &Arc<Mutex<BuildContext>>,
274 ) -> Result<Vec<String>, BuildError> {
275 let mut results = Vec::with_capacity(elements.len());
276
277 for element in elements {
278 let mut context = context.lock().unwrap();
279 let xml = self.element_to_xml_string(element, &mut context)?;
280 results.push(xml);
281 }
282
283 Ok(results)
284 }
285
286 fn element_to_xml_string(
288 &self,
289 element: &Element,
290 context: &mut BuildContext,
291 ) -> Result<String, BuildError> {
292 let mut buffer = context.get_xml_buffer(256);
294
295 buffer.push('<');
296 buffer.push_str(&element.name);
297
298 for (key, value) in &element.attributes {
300 buffer.push_str(&format!(" {}=\"{}\"", key, value));
301 }
302
303 if element.children.is_empty() {
304 buffer.push_str("/>");
305 } else {
306 buffer.push('>');
307
308 for child in &element.children {
310 match child {
311 Node::Text(text) => buffer.push_str(text),
312 Node::Element(child_element) => {
313 let child_xml = self.element_to_xml_string(child_element, context)?;
314 buffer.push_str(&child_xml);
315 }
316 Node::Comment(comment) => {
317 buffer.push_str(&comment.to_xml());
318 }
319 Node::SimpleComment(comment) => {
320 buffer.push_str(&format!("<!-- {} -->", comment));
321 }
322 }
323 }
324
325 buffer.push_str(&format!("</{}>", element.name));
326 }
327
328 let result = buffer.clone();
329 context.return_xml_buffer(buffer);
330
331 Ok(result)
332 }
333
334 pub fn validate_items_parallel<T, F>(
336 &self,
337 items: &[T],
338 validator: F,
339 ) -> Result<Vec<ValidationResult>, BuildError>
340 where
341 T: Send + Sync,
342 F: Fn(&T) -> Result<(), BuildError> + Send + Sync,
343 {
344 if items.len() < self.config.parallel_threshold || !self.config.parallel_validation {
345 return self.validate_items_sequential(items, validator);
346 }
347
348 let validation_results: Vec<ValidationResult> = self.thread_pool.install(|| {
349 items
350 .par_iter()
351 .map(|item| {
352 let start_time = Instant::now();
353 let result = validator(item);
354 let processing_time = start_time.elapsed();
355
356 ValidationResult {
357 success: result.is_ok(),
358 error: result.err(),
359 processing_time,
360 }
361 })
362 .collect()
363 });
364
365 for result in &validation_results {
367 if !result.success {
368 if let Some(ref err) = result.error {
369 return Err(err.clone());
370 }
371 }
372 }
373
374 Ok(validation_results)
375 }
376
377 fn validate_items_sequential<T, F>(
379 &self,
380 items: &[T],
381 validator: F,
382 ) -> Result<Vec<ValidationResult>, BuildError>
383 where
384 F: Fn(&T) -> Result<(), BuildError>,
385 {
386 let mut results = Vec::with_capacity(items.len());
387
388 for item in items {
389 let start_time = Instant::now();
390 let result = validator(item);
391 let processing_time = start_time.elapsed();
392
393 results.push(ValidationResult {
394 success: result.is_ok(),
395 error: result.err(),
396 processing_time,
397 });
398 }
399
400 Ok(results)
401 }
402}
403
404#[derive(Debug)]
406pub struct ParallelBuildResult {
407 pub elements: Vec<ProcessedElement>,
409 pub processing_time: std::time::Duration,
411 pub used_parallel: bool,
413 pub thread_count: usize,
415 pub total_tracks: usize,
417}
418
419impl ParallelBuildResult {
420 pub fn meets_performance_target(&self) -> bool {
422 match self.total_tracks {
423 1 => self.processing_time.as_millis() < 5, 2..=20 => self.processing_time.as_millis() < 10, _ => self.processing_time.as_millis() < 50, }
427 }
428
429 pub fn performance_summary(&self) -> String {
431 format!(
432 "Processed {} tracks in {:.2}ms using {} thread(s) ({}parallel)",
433 self.total_tracks,
434 self.processing_time.as_millis(),
435 self.thread_count,
436 if self.used_parallel { "" } else { "non-" }
437 )
438 }
439}
440
441#[derive(Debug)]
443pub struct ProcessedElement {
444 pub name: String,
446 pub processing_time: std::time::Duration,
448 pub element_count: usize,
450}
451
452#[derive(Debug)]
454pub struct ThreadMetrics {
455 pub name: String,
457 pub processing_time: std::time::Duration,
459 pub element_count: usize,
461}
462
463#[derive(Debug)]
465#[allow(dead_code)]
466struct ValidatedTrack {
467 pub track_id: String,
468 pub isrc: String,
469 pub title: String,
470 pub duration: String,
471 pub artist: String,
472}
473
474#[derive(Debug)]
476pub struct ValidationResult {
477 pub success: bool,
479 pub error: Option<BuildError>,
481 pub processing_time: std::time::Duration,
483}
484
485pub struct WorkloadAnalyzer;
487
488impl WorkloadAnalyzer {
489 pub fn analyze_workload(request: &BuildRequest) -> WorkloadAnalysis {
491 let total_releases = request.releases.len();
492 let total_tracks: usize = request.releases.iter().map(|r| r.tracks.len()).sum();
493
494 let max_tracks_per_release = request
495 .releases
496 .iter()
497 .map(|r| r.tracks.len())
498 .max()
499 .unwrap_or(0);
500
501 let complexity_score = Self::calculate_complexity_score(request);
502
503 WorkloadAnalysis {
504 total_releases,
505 total_tracks,
506 max_tracks_per_release,
507 complexity_score,
508 recommended_config: Self::recommend_config(total_tracks, complexity_score),
509 }
510 }
511
512 fn calculate_complexity_score(request: &BuildRequest) -> f32 {
514 let mut score = 0.0;
515
516 let total_tracks: usize = request.releases.iter().map(|r| r.tracks.len()).sum();
518 score += total_tracks as f32 * 1.0;
519
520 score += request.releases.len() as f32 * 0.5;
522
523 score += request.deals.len() as f32 * 2.0; if request.extensions.is_some() {
528 score += 1.0;
529 }
530
531 score
532 }
533
534 fn recommend_config(total_tracks: usize, complexity_score: f32) -> ParallelConfig {
536 let parallel_threshold = if complexity_score > 20.0 {
537 3 } else if total_tracks > 50 {
539 5 } else {
541 10 };
543
544 let max_threads = if total_tracks > 100 {
545 None } else if total_tracks > 20 {
547 Some(num_cpus::get().min(4)) } else {
549 Some(2) };
551
552 ParallelConfig {
553 parallel_threshold,
554 max_threads,
555 parallel_validation: complexity_score > 10.0,
556 parallel_xml_generation: total_tracks > 15,
557 }
558 }
559}
560
561#[derive(Debug)]
563pub struct WorkloadAnalysis {
564 pub total_releases: usize,
566 pub total_tracks: usize,
568 pub max_tracks_per_release: usize,
570 pub complexity_score: f32,
572 pub recommended_config: ParallelConfig,
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579 use crate::builder::{LocalizedStringRequest, MessageHeaderRequest, PartyRequest};
580
581 #[test]
582 fn test_parallel_processor_creation() {
583 let config = ParallelConfig::default();
584 let processor = ParallelProcessor::new(config);
585 assert!(processor.is_ok());
586 }
587
588 #[test]
589 fn test_workload_analysis() {
590 let request = BuildRequest {
591 header: MessageHeaderRequest {
592 message_id: Some("TEST_001".to_string()),
593 message_sender: PartyRequest {
594 party_name: vec![LocalizedStringRequest {
595 text: "Test Sender".to_string(),
596 language_code: None,
597 }],
598 party_id: Some("SENDER_001".to_string()),
599 party_reference: None,
600 },
601 message_recipient: PartyRequest {
602 party_name: vec![LocalizedStringRequest {
603 text: "Test Recipient".to_string(),
604 language_code: None,
605 }],
606 party_id: Some("RECIPIENT_001".to_string()),
607 party_reference: None,
608 },
609 message_control_type: None,
610 message_created_date_time: None,
611 },
612 version: "4.3".to_string(),
613 profile: None,
614 releases: vec![],
615 deals: vec![],
616 extensions: None,
617 };
618
619 let analysis = WorkloadAnalyzer::analyze_workload(&request);
620 assert_eq!(analysis.total_tracks, 0);
621 assert_eq!(analysis.total_releases, 0);
622 }
623
624 #[test]
625 fn test_track_validation() {
626 let config = ParallelConfig::default();
627 let processor = ParallelProcessor::new(config).unwrap();
628
629 let valid_track = TrackRequest {
630 track_id: "T001".to_string(),
631 resource_reference: Some("A001".to_string()),
632 isrc: "USRC17607839".to_string(), title: "Test Track".to_string(),
634 duration: "PT3M30S".to_string(),
635 artist: "Test Artist".to_string(),
636 };
637
638 let result = processor.validate_track(&valid_track);
639 assert!(result.is_ok());
640
641 let invalid_track = TrackRequest {
642 track_id: "T002".to_string(),
643 resource_reference: None,
644 isrc: "INVALID".to_string(), title: "".to_string(), duration: "3:30".to_string(), artist: "Test Artist".to_string(),
648 };
649
650 let result = processor.validate_track(&invalid_track);
651 assert!(result.is_err());
652 }
653
654 #[test]
655 fn test_performance_target_checking() {
656 let result = ParallelBuildResult {
657 elements: vec![],
658 processing_time: std::time::Duration::from_millis(3),
659 used_parallel: false,
660 thread_count: 1,
661 total_tracks: 1,
662 };
663
664 assert!(result.meets_performance_target()); let slow_result = ParallelBuildResult {
667 elements: vec![],
668 processing_time: std::time::Duration::from_millis(15),
669 used_parallel: true,
670 thread_count: 4,
671 total_tracks: 12,
672 };
673
674 assert!(!slow_result.meets_performance_target()); }
676}