1use crate::ast::{Element, Node};
7use crate::builder::{BuildRequest, ReleaseRequest, TrackRequest};
8use crate::error::BuildError;
9use crate::optimized_strings::BuildContext;
10use crate::memory_optimization::BuildMemoryManager;
11use indexmap::IndexMap;
12use rayon::prelude::*;
13use std::sync::{Arc, Mutex};
14use std::time::Instant;
15
16#[derive(Debug, Clone)]
18pub struct ParallelConfig {
19 pub parallel_threshold: usize,
21 pub max_threads: Option<usize>,
23 pub parallel_validation: bool,
25 pub parallel_xml_generation: bool,
27}
28
29impl Default for ParallelConfig {
30 fn default() -> Self {
31 Self {
32 parallel_threshold: 5, max_threads: None, parallel_validation: true,
35 parallel_xml_generation: true,
36 }
37 }
38}
39
40pub struct ParallelProcessor {
42 config: ParallelConfig,
43 thread_pool: rayon::ThreadPool,
44}
45
46impl ParallelProcessor {
47 pub fn new(config: ParallelConfig) -> Result<Self, BuildError> {
49 let thread_pool = if let Some(max_threads) = config.max_threads {
50 rayon::ThreadPoolBuilder::new()
51 .num_threads(max_threads)
52 .build()
53 .map_err(|e| BuildError::Parallel(e.to_string()))?
54 } else {
55 rayon::ThreadPoolBuilder::new()
56 .build()
57 .map_err(|e| BuildError::Parallel(e.to_string()))?
58 };
59
60 Ok(Self {
61 config,
62 thread_pool,
63 })
64 }
65
66 pub fn process_build_parallel(
68 &self,
69 request: &BuildRequest,
70 context: &mut BuildContext,
71 memory_manager: &BuildMemoryManager,
72 ) -> Result<ParallelBuildResult, BuildError> {
73 let start_time = Instant::now();
74
75 let total_tracks: usize = request.releases.iter()
77 .map(|r| r.tracks.len())
78 .sum();
79
80 let use_parallel = total_tracks >= self.config.parallel_threshold;
81
82 let result = if use_parallel {
83 self.process_parallel_impl(request, context, memory_manager)?
84 } else {
85 self.process_sequential_impl(request, context, memory_manager)?
86 };
87
88 let processing_time = start_time.elapsed();
89
90 Ok(ParallelBuildResult {
91 elements: result,
92 processing_time,
93 used_parallel: use_parallel,
94 thread_count: if use_parallel {
95 self.thread_pool.current_num_threads()
96 } else {
97 1
98 },
99 total_tracks,
100 })
101 }
102
103 fn process_parallel_impl(
105 &self,
106 request: &BuildRequest,
107 _context: &mut BuildContext,
108 _memory_manager: &BuildMemoryManager,
109 ) -> Result<Vec<ProcessedElement>, BuildError> {
110 self.thread_pool.install(|| {
111 let processed_releases: Result<Vec<_>, BuildError> = request.releases
113 .par_iter()
114 .map(|release| self.process_release_parallel(release))
115 .collect();
116
117 let releases = processed_releases?;
118
119 Ok(releases.into_iter().flatten().collect())
121 })
122 }
123
124 fn process_sequential_impl(
126 &self,
127 request: &BuildRequest,
128 _context: &mut BuildContext,
129 _memory_manager: &BuildMemoryManager,
130 ) -> Result<Vec<ProcessedElement>, BuildError> {
131 let mut results = Vec::new();
132
133 for release in &request.releases {
134 let processed = self.process_release_sequential(release)?;
135 results.extend(processed);
136 }
137
138 Ok(results)
139 }
140
141 fn process_release_parallel(&self, release: &ReleaseRequest) -> Result<Vec<ProcessedElement>, BuildError> {
143 if release.tracks.len() >= self.config.parallel_threshold {
145 let processed_tracks: Result<Vec<_>, BuildError> = release.tracks
146 .par_iter()
147 .map(|track| self.process_track(track))
148 .collect();
149
150 let tracks = processed_tracks?;
151
152 let release_element = ProcessedElement {
154 name: "Release".to_string(),
155 processing_time: std::time::Duration::from_nanos(1), element_count: 1 + tracks.len(),
157 };
158
159 let mut result = vec![release_element];
160 result.extend(tracks);
161 Ok(result)
162 } else {
163 self.process_release_sequential(release)
164 }
165 }
166
167 fn process_release_sequential(&self, release: &ReleaseRequest) -> Result<Vec<ProcessedElement>, BuildError> {
169 let mut results = Vec::new();
170
171 let release_element = ProcessedElement {
173 name: "Release".to_string(),
174 processing_time: std::time::Duration::from_nanos(1),
175 element_count: 1,
176 };
177 results.push(release_element);
178
179 for track in &release.tracks {
181 results.push(self.process_track(track)?);
182 }
183
184 Ok(results)
185 }
186
187 fn process_track(&self, track: &TrackRequest) -> Result<ProcessedElement, BuildError> {
189 let start_time = Instant::now();
190
191 let _validated = self.validate_track(track)?;
194
195 let processing_time = start_time.elapsed();
196
197 Ok(ProcessedElement {
198 name: format!("Track_{}", track.track_id),
199 processing_time,
200 element_count: 1,
201 })
202 }
203
204 fn validate_track(&self, track: &TrackRequest) -> Result<ValidatedTrack, BuildError> {
206 if track.isrc.len() != 12 {
208 return Err(BuildError::Validation(format!(
209 "Invalid ISRC length for track {}: expected 12 characters, got {}",
210 track.track_id,
211 track.isrc.len()
212 )));
213 }
214
215 if !track.duration.starts_with("PT") {
217 return Err(BuildError::Validation(format!(
218 "Invalid duration format for track {}: must start with 'PT'",
219 track.track_id
220 )));
221 }
222
223 if track.title.trim().is_empty() {
225 return Err(BuildError::Validation(format!(
226 "Track title cannot be empty for track {}",
227 track.track_id
228 )));
229 }
230
231 Ok(ValidatedTrack {
232 track_id: track.track_id.clone(),
233 isrc: track.isrc.clone(),
234 title: track.title.clone(),
235 duration: track.duration.clone(),
236 artist: track.artist.clone(),
237 })
238 }
239
240 pub fn generate_xml_sections_parallel(
242 &self,
243 elements: &[Element],
244 context: &Arc<Mutex<BuildContext>>,
245 ) -> Result<Vec<String>, BuildError> {
246 if elements.len() < self.config.parallel_threshold || !self.config.parallel_xml_generation {
247 return self.generate_xml_sections_sequential(elements, context);
248 }
249
250 self.thread_pool.install(|| {
251 elements.par_iter()
252 .map(|element| {
253 let mut local_context = BuildContext::new();
255
256 self.element_to_xml_string(element, &mut local_context)
258 })
259 .collect()
260 })
261 }
262
263 fn generate_xml_sections_sequential(
265 &self,
266 elements: &[Element],
267 context: &Arc<Mutex<BuildContext>>,
268 ) -> Result<Vec<String>, BuildError> {
269 let mut results = Vec::with_capacity(elements.len());
270
271 for element in elements {
272 let mut context = context.lock().unwrap();
273 let xml = self.element_to_xml_string(element, &mut context)?;
274 results.push(xml);
275 }
276
277 Ok(results)
278 }
279
280 fn element_to_xml_string(&self, element: &Element, context: &mut BuildContext) -> Result<String, BuildError> {
282 let mut buffer = context.get_xml_buffer(256);
284
285 buffer.push('<');
286 buffer.push_str(&element.name);
287
288 for (key, value) in &element.attributes {
290 buffer.push_str(&format!(" {}=\"{}\"", key, value));
291 }
292
293 if element.children.is_empty() {
294 buffer.push_str("/>");
295 } else {
296 buffer.push('>');
297
298 for child in &element.children {
300 match child {
301 Node::Text(text) => buffer.push_str(text),
302 Node::Element(child_element) => {
303 let child_xml = self.element_to_xml_string(child_element, context)?;
304 buffer.push_str(&child_xml);
305 }
306 Node::Comment(comment) => {
307 buffer.push_str(&comment.to_xml());
308 }
309 Node::SimpleComment(comment) => {
310 buffer.push_str(&format!("<!-- {} -->", comment));
311 }
312 }
313 }
314
315 buffer.push_str(&format!("</{}>", element.name));
316 }
317
318 let result = buffer.clone();
319 context.return_xml_buffer(buffer);
320
321 Ok(result)
322 }
323
324 pub fn validate_items_parallel<T, F>(
326 &self,
327 items: &[T],
328 validator: F,
329 ) -> Result<Vec<ValidationResult>, BuildError>
330 where
331 T: Send + Sync,
332 F: Fn(&T) -> Result<(), BuildError> + Send + Sync,
333 {
334 if items.len() < self.config.parallel_threshold || !self.config.parallel_validation {
335 return self.validate_items_sequential(items, validator);
336 }
337
338 let validation_results: Vec<ValidationResult> = self.thread_pool.install(|| {
339 items.par_iter()
340 .map(|item| {
341 let start_time = Instant::now();
342 let result = validator(item);
343 let processing_time = start_time.elapsed();
344
345 ValidationResult {
346 success: result.is_ok(),
347 error: result.err(),
348 processing_time,
349 }
350 })
351 .collect()
352 });
353
354 for result in &validation_results {
356 if !result.success {
357 if let Some(ref err) = result.error {
358 return Err(err.clone());
359 }
360 }
361 }
362
363 Ok(validation_results)
364 }
365
366 fn validate_items_sequential<T, F>(
368 &self,
369 items: &[T],
370 validator: F,
371 ) -> Result<Vec<ValidationResult>, BuildError>
372 where
373 F: Fn(&T) -> Result<(), BuildError>,
374 {
375 let mut results = Vec::with_capacity(items.len());
376
377 for item in items {
378 let start_time = Instant::now();
379 let result = validator(item);
380 let processing_time = start_time.elapsed();
381
382 results.push(ValidationResult {
383 success: result.is_ok(),
384 error: result.err(),
385 processing_time,
386 });
387 }
388
389 Ok(results)
390 }
391}
392
393#[derive(Debug)]
395pub struct ParallelBuildResult {
396 pub elements: Vec<ProcessedElement>,
397 pub processing_time: std::time::Duration,
398 pub used_parallel: bool,
399 pub thread_count: usize,
400 pub total_tracks: usize,
401}
402
403impl ParallelBuildResult {
404 pub fn meets_performance_target(&self) -> bool {
406 match self.total_tracks {
407 1 => self.processing_time.as_millis() < 5, 2..=20 => self.processing_time.as_millis() < 10, _ => self.processing_time.as_millis() < 50, }
411 }
412
413 pub fn performance_summary(&self) -> String {
415 format!(
416 "Processed {} tracks in {:.2}ms using {} thread(s) ({}parallel)",
417 self.total_tracks,
418 self.processing_time.as_millis(),
419 self.thread_count,
420 if self.used_parallel { "" } else { "non-" }
421 )
422 }
423}
424
425#[derive(Debug)]
427pub struct ProcessedElement {
428 pub name: String,
429 pub processing_time: std::time::Duration,
430 pub element_count: usize,
431}
432
433#[derive(Debug)]
435struct ValidatedTrack {
436 pub track_id: String,
437 pub isrc: String,
438 pub title: String,
439 pub duration: String,
440 pub artist: String,
441}
442
443#[derive(Debug)]
445pub struct ValidationResult {
446 pub success: bool,
447 pub error: Option<BuildError>,
448 pub processing_time: std::time::Duration,
449}
450
451pub struct WorkloadAnalyzer;
453
454impl WorkloadAnalyzer {
455 pub fn analyze_workload(request: &BuildRequest) -> WorkloadAnalysis {
457 let total_releases = request.releases.len();
458 let total_tracks: usize = request.releases.iter()
459 .map(|r| r.tracks.len())
460 .sum();
461
462 let max_tracks_per_release = request.releases.iter()
463 .map(|r| r.tracks.len())
464 .max()
465 .unwrap_or(0);
466
467 let complexity_score = Self::calculate_complexity_score(request);
468
469 WorkloadAnalysis {
470 total_releases,
471 total_tracks,
472 max_tracks_per_release,
473 complexity_score,
474 recommended_config: Self::recommend_config(total_tracks, complexity_score),
475 }
476 }
477
478 fn calculate_complexity_score(request: &BuildRequest) -> f32 {
480 let mut score = 0.0;
481
482 let total_tracks: usize = request.releases.iter()
484 .map(|r| r.tracks.len())
485 .sum();
486 score += total_tracks as f32 * 1.0;
487
488 score += request.releases.len() as f32 * 0.5;
490
491 score += request.deals.len() as f32 * 2.0; if request.extensions.is_some() {
496 score += 1.0;
497 }
498
499 score
500 }
501
502 fn recommend_config(total_tracks: usize, complexity_score: f32) -> ParallelConfig {
504 let parallel_threshold = if complexity_score > 20.0 {
505 3 } else if total_tracks > 50 {
507 5 } else {
509 10 };
511
512 let max_threads = if total_tracks > 100 {
513 None } else if total_tracks > 20 {
515 Some(num_cpus::get().min(4)) } else {
517 Some(2) };
519
520 ParallelConfig {
521 parallel_threshold,
522 max_threads,
523 parallel_validation: complexity_score > 10.0,
524 parallel_xml_generation: total_tracks > 15,
525 }
526 }
527}
528
529#[derive(Debug)]
531pub struct WorkloadAnalysis {
532 pub total_releases: usize,
533 pub total_tracks: usize,
534 pub max_tracks_per_release: usize,
535 pub complexity_score: f32,
536 pub recommended_config: ParallelConfig,
537}
538
539#[cfg(test)]
540mod tests {
541 use super::*;
542 use crate::builder::{LocalizedStringRequest, PartyRequest, MessageHeaderRequest};
543
544 #[test]
545 fn test_parallel_processor_creation() {
546 let config = ParallelConfig::default();
547 let processor = ParallelProcessor::new(config);
548 assert!(processor.is_ok());
549 }
550
551 #[test]
552 fn test_workload_analysis() {
553 let request = BuildRequest {
554 header: MessageHeaderRequest {
555 message_id: Some("TEST_001".to_string()),
556 message_sender: PartyRequest {
557 party_name: vec![LocalizedStringRequest {
558 text: "Test Sender".to_string(),
559 language_code: None,
560 }],
561 party_id: Some("SENDER_001".to_string()),
562 party_reference: None,
563 },
564 message_recipient: PartyRequest {
565 party_name: vec![LocalizedStringRequest {
566 text: "Test Recipient".to_string(),
567 language_code: None,
568 }],
569 party_id: Some("RECIPIENT_001".to_string()),
570 party_reference: None,
571 },
572 message_control_type: None,
573 message_created_date_time: None,
574 },
575 version: "4.3".to_string(),
576 profile: None,
577 releases: vec![],
578 deals: vec![],
579 extensions: None,
580 };
581
582 let analysis = WorkloadAnalyzer::analyze_workload(&request);
583 assert_eq!(analysis.total_tracks, 0);
584 assert_eq!(analysis.total_releases, 0);
585 }
586
587 #[test]
588 fn test_track_validation() {
589 let config = ParallelConfig::default();
590 let processor = ParallelProcessor::new(config).unwrap();
591
592 let valid_track = TrackRequest {
593 track_id: "T001".to_string(),
594 resource_reference: Some("A001".to_string()),
595 isrc: "USRC17607839".to_string(), title: "Test Track".to_string(),
597 duration: "PT3M30S".to_string(),
598 artist: "Test Artist".to_string(),
599 };
600
601 let result = processor.validate_track(&valid_track);
602 assert!(result.is_ok());
603
604 let invalid_track = TrackRequest {
605 track_id: "T002".to_string(),
606 resource_reference: None,
607 isrc: "INVALID".to_string(), title: "".to_string(), duration: "3:30".to_string(), artist: "Test Artist".to_string(),
611 };
612
613 let result = processor.validate_track(&invalid_track);
614 assert!(result.is_err());
615 }
616
617 #[test]
618 fn test_performance_target_checking() {
619 let result = ParallelBuildResult {
620 elements: vec![],
621 processing_time: std::time::Duration::from_millis(3),
622 used_parallel: false,
623 thread_count: 1,
624 total_tracks: 1,
625 };
626
627 assert!(result.meets_performance_target()); let slow_result = ParallelBuildResult {
630 elements: vec![],
631 processing_time: std::time::Duration::from_millis(15),
632 used_parallel: true,
633 thread_count: 4,
634 total_tracks: 12,
635 };
636
637 assert!(!slow_result.meets_performance_target()); }
639}