ddex_builder/streaming/
mod.rs

1//! Streaming DDEX XML builder for large catalogs
2//!
3//! This module provides a memory-efficient streaming approach to building DDEX XML
4//! that can handle catalogs with thousands of releases without loading everything
5//! into memory at once.
6
7pub mod buffer_manager;
8pub mod reference_manager;
9
10use crate::builder::MessageHeaderRequest;
11use crate::determinism::DeterminismConfig;
12use crate::error::{BuildError, BuildWarning};
13use buffer_manager::BufferManager;
14use reference_manager::StreamingReferenceManager;
15use std::io::Write as IoWrite;
16use uuid::Uuid;
17
18/// Configuration for streaming builder
19///
20/// Controls the behavior of the streaming DDEX XML builder including
21/// buffer management, deterministic ordering, validation, and progress reporting.
22///
23/// # Example
24/// ```
25/// use ddex_builder::streaming::StreamingConfig;
26/// use ddex_builder::determinism::DeterminismConfig;
27///
28/// let config = StreamingConfig {
29///     max_buffer_size: 5 * 1024 * 1024, // 5MB buffer
30///     deterministic: true,
31///     determinism_config: DeterminismConfig::default(),
32///     validate_during_stream: true,
33///     progress_callback_frequency: 50, // Report every 50 items
34/// };
35/// ```
36#[derive(Debug, Clone)]
37pub struct StreamingConfig {
38    /// Maximum buffer size in bytes before automatic flush (default: 10MB)
39    pub max_buffer_size: usize,
40    /// Whether to use deterministic ordering for consistent output
41    pub deterministic: bool,
42    /// Detailed configuration for deterministic behavior
43    pub determinism_config: DeterminismConfig,
44    /// Whether to validate data while streaming (slower but safer)
45    pub validate_during_stream: bool,
46    /// Progress callback frequency - report progress every N items processed
47    pub progress_callback_frequency: usize,
48}
49
50impl Default for StreamingConfig {
51    fn default() -> Self {
52        Self {
53            max_buffer_size: 10 * 1024 * 1024, // 10MB
54            deterministic: true,
55            determinism_config: DeterminismConfig::default(),
56            validate_during_stream: true,
57            progress_callback_frequency: 100,
58        }
59    }
60}
61
62/// Progress information for streaming operations
63///
64/// Provides real-time progress updates during streaming DDEX XML generation.
65/// Used by progress callbacks to monitor the build process and estimate completion.
66///
67/// # Example
68/// ```
69/// use ddex_builder::streaming::{StreamingBuilder, StreamingProgress};
70///
71/// let mut builder = StreamingBuilder::new(output_writer)?;
72/// builder.set_progress_callback(Box::new(|progress: StreamingProgress| {
73///     println!("Progress: {}/{} items, {} MB written",
74///              progress.releases_written + progress.resources_written,
75///              progress.estimated_completion_percent.unwrap_or(0.0),
76///              progress.bytes_written / 1024 / 1024);
77/// }));
78/// ```
79#[derive(Debug, Clone)]
80pub struct StreamingProgress {
81    /// Number of releases written to the stream so far
82    pub releases_written: usize,
83    /// Number of resources (tracks/recordings) written to the stream so far
84    pub resources_written: usize,
85    /// Total bytes written to the output stream so far
86    pub bytes_written: usize,
87    /// Current memory usage in bytes (includes buffers)
88    pub current_memory_usage: usize,
89    /// Estimated completion percentage (0.0-100.0) if total items was set
90    pub estimated_completion_percent: Option<f64>,
91}
92
93/// Callback type for progress updates
94pub type ProgressCallback = Box<dyn Fn(StreamingProgress) + Send + Sync>;
95
96/// Streaming DDEX XML builder
97pub struct StreamingBuilder<W: IoWrite> {
98    buffer_manager: BufferManager<W>,
99    reference_manager: StreamingReferenceManager,
100    config: StreamingConfig,
101    xml_buffer: Vec<u8>,
102
103    // State tracking
104    message_started: bool,
105    message_finished: bool,
106    releases_written: usize,
107    resources_written: usize,
108    deals_written: usize,
109    warnings: Vec<BuildWarning>,
110
111    // Progress tracking
112    progress_callback: Option<ProgressCallback>,
113    estimated_total_items: Option<usize>,
114}
115
116impl<W: IoWrite> StreamingBuilder<W> {
117    /// Create a new streaming builder with the given writer
118    pub fn new(writer: W) -> Result<Self, BuildError> {
119        Self::new_with_config(writer, StreamingConfig::default())
120    }
121
122    /// Create a new streaming builder with custom configuration
123    pub fn new_with_config(writer: W, config: StreamingConfig) -> Result<Self, BuildError> {
124        let buffer_manager = BufferManager::new(writer, config.max_buffer_size).map_err(|e| {
125            BuildError::XmlGeneration(format!("Failed to create buffer manager: {}", e))
126        })?;
127
128        Ok(StreamingBuilder {
129            buffer_manager,
130            reference_manager: StreamingReferenceManager::new(),
131            config,
132            xml_buffer: Vec::new(),
133            message_started: false,
134            message_finished: false,
135            releases_written: 0,
136            resources_written: 0,
137            deals_written: 0,
138            warnings: Vec::new(),
139            progress_callback: None,
140            estimated_total_items: None,
141        })
142    }
143
144    /// Set a progress callback function
145    pub fn set_progress_callback(&mut self, callback: ProgressCallback) {
146        self.progress_callback = Some(callback);
147    }
148
149    /// Set estimated total number of items for progress calculation
150    pub fn set_estimated_total(&mut self, total: usize) {
151        self.estimated_total_items = Some(total);
152    }
153
154    /// Start the DDEX message with header information
155    pub fn start_message(
156        &mut self,
157        header: &MessageHeaderRequest,
158        version: &str,
159    ) -> Result<(), BuildError> {
160        if self.message_started {
161            return Err(BuildError::XmlGeneration(
162                "Message already started".to_string(),
163            ));
164        }
165
166        // Write XML declaration and start of message
167        let xml_start = format!(
168            r#"<?xml version="1.0" encoding="UTF-8"?>
169<NewReleaseMessage xmlns="http://ddex.net/xml/ern/43" MessageSchemaVersionId="{}" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
170"#,
171            version
172        );
173
174        self.xml_buffer.extend_from_slice(xml_start.as_bytes());
175
176        // Write message header
177        self.write_message_header(header)?;
178
179        // Start resource list container
180        self.xml_buffer.extend_from_slice(b"  <ResourceList>\n");
181
182        self.message_started = true;
183        self.flush_if_needed()?;
184
185        Ok(())
186    }
187
188    /// Write a single resource to the stream
189    pub fn write_resource(
190        &mut self,
191        resource_id: &str,
192        title: &str,
193        artist: &str,
194        isrc: Option<&str>,
195        duration: Option<&str>,
196        file_path: Option<&str>,
197    ) -> Result<String, BuildError> {
198        if !self.message_started || self.message_finished {
199            return Err(BuildError::XmlGeneration(
200                "Message not in valid state for writing resources".to_string(),
201            ));
202        }
203
204        // Generate stable reference for this resource
205        let resource_ref = self
206            .reference_manager
207            .generate_resource_reference(resource_id)?;
208
209        // Build SoundRecording XML
210        let mut resource_xml = String::new();
211        resource_xml.push_str("    <SoundRecording>\n");
212        resource_xml.push_str(&format!(
213            "      <ResourceReference>{}</ResourceReference>\n",
214            resource_ref
215        ));
216        resource_xml.push_str("      <Type>SoundRecording</Type>\n");
217        resource_xml.push_str(&format!(
218            "      <ResourceId>{}</ResourceId>\n",
219            escape_xml(resource_id)
220        ));
221        resource_xml.push_str(&format!(
222            "      <ReferenceTitle>{}</ReferenceTitle>\n",
223            escape_xml(title)
224        ));
225        resource_xml.push_str(&format!(
226            "      <DisplayArtist>{}</DisplayArtist>\n",
227            escape_xml(artist)
228        ));
229
230        if let Some(isrc_value) = isrc {
231            resource_xml.push_str(&format!("      <ISRC>{}</ISRC>\n", escape_xml(isrc_value)));
232        }
233
234        if let Some(duration_value) = duration {
235            resource_xml.push_str(&format!(
236                "      <Duration>{}</Duration>\n",
237                escape_xml(duration_value)
238            ));
239        }
240
241        if let Some(file_path_value) = file_path {
242            resource_xml.push_str("      <TechnicalResourceDetails>\n");
243            resource_xml.push_str(&format!(
244                "        <FileName>{}</FileName>\n",
245                escape_xml(file_path_value)
246            ));
247            resource_xml.push_str("        <AudioCodecType>MP3</AudioCodecType>\n");
248            resource_xml.push_str("      </TechnicalResourceDetails>\n");
249        }
250
251        resource_xml.push_str("    </SoundRecording>\n");
252
253        self.xml_buffer.extend_from_slice(resource_xml.as_bytes());
254
255        self.resources_written += 1;
256
257        // Check for progress callback
258        if self.resources_written % self.config.progress_callback_frequency == 0 {
259            self.report_progress();
260        }
261
262        // Flush if buffer is getting large
263        self.flush_if_needed()?;
264
265        Ok(resource_ref)
266    }
267
268    /// Finish the resource section and start the release section
269    pub fn finish_resources_start_releases(&mut self) -> Result<(), BuildError> {
270        if !self.message_started || self.message_finished {
271            return Err(BuildError::XmlGeneration(
272                "Message not in valid state".to_string(),
273            ));
274        }
275
276        // End ResourceList and start ReleaseList
277        self.xml_buffer.extend_from_slice(b"  </ResourceList>\n");
278        self.xml_buffer.extend_from_slice(b"  <ReleaseList>\n");
279
280        self.flush_if_needed()?;
281        Ok(())
282    }
283
284    /// Write a single release to the stream
285    pub fn write_release(
286        &mut self,
287        release_id: &str,
288        title: &str,
289        artist: &str,
290        label: Option<&str>,
291        upc: Option<&str>,
292        release_date: Option<&str>,
293        genre: Option<&str>,
294        resource_references: &[String],
295    ) -> Result<String, BuildError> {
296        if !self.message_started || self.message_finished {
297            return Err(BuildError::XmlGeneration(
298                "Message not in valid state for writing releases".to_string(),
299            ));
300        }
301
302        // Generate stable reference for this release
303        let release_ref = self
304            .reference_manager
305            .generate_release_reference(release_id)?;
306
307        // Build Release XML
308        let mut release_xml = String::new();
309        release_xml.push_str("    <Release>\n");
310        release_xml.push_str(&format!(
311            "      <ReleaseReference>{}</ReleaseReference>\n",
312            release_ref
313        ));
314        release_xml.push_str(&format!(
315            "      <ReleaseId>{}</ReleaseId>\n",
316            escape_xml(release_id)
317        ));
318        release_xml.push_str("      <ReleaseType>Album</ReleaseType>\n");
319        release_xml.push_str(&format!("      <Title>{}</Title>\n", escape_xml(title)));
320        release_xml.push_str(&format!(
321            "      <DisplayArtist>{}</DisplayArtist>\n",
322            escape_xml(artist)
323        ));
324
325        if let Some(label_value) = label {
326            release_xml.push_str(&format!(
327                "      <LabelName>{}</LabelName>\n",
328                escape_xml(label_value)
329            ));
330        }
331
332        if let Some(upc_value) = upc {
333            release_xml.push_str(&format!("      <UPC>{}</UPC>\n", escape_xml(upc_value)));
334        }
335
336        if let Some(date_value) = release_date {
337            release_xml.push_str(&format!(
338                "      <ReleaseDate>{}</ReleaseDate>\n",
339                escape_xml(date_value)
340            ));
341        }
342
343        if let Some(genre_value) = genre {
344            release_xml.push_str(&format!(
345                "      <Genre>{}</Genre>\n",
346                escape_xml(genre_value)
347            ));
348        }
349
350        // Write ResourceGroup linking to resources
351        if !resource_references.is_empty() {
352            release_xml.push_str("      <ResourceGroup>\n");
353            for resource_ref in resource_references {
354                release_xml.push_str(&format!(
355                    "        <ResourceReference>{}</ResourceReference>\n",
356                    resource_ref
357                ));
358            }
359            release_xml.push_str("      </ResourceGroup>\n");
360        }
361
362        release_xml.push_str("    </Release>\n");
363
364        self.xml_buffer.extend_from_slice(release_xml.as_bytes());
365
366        self.releases_written += 1;
367
368        // Check for progress callback
369        if self.releases_written % self.config.progress_callback_frequency == 0 {
370            self.report_progress();
371        }
372
373        // Flush if buffer is getting large
374        self.flush_if_needed()?;
375
376        Ok(release_ref)
377    }
378
379    /// Finish the message and close all tags
380    pub fn finish_message(&mut self) -> Result<StreamingStats, BuildError> {
381        if !self.message_started || self.message_finished {
382            return Err(BuildError::XmlGeneration(
383                "Message not in valid state to finish".to_string(),
384            ));
385        }
386
387        // End ReleaseList and close root element
388        self.xml_buffer.extend_from_slice(b"  </ReleaseList>\n");
389        self.xml_buffer.extend_from_slice(b"</NewReleaseMessage>\n");
390
391        // Final flush of any remaining content
392        if !self.xml_buffer.is_empty() {
393            self.buffer_manager
394                .write_chunk(&self.xml_buffer)
395                .map_err(|e| {
396                    BuildError::XmlGeneration(format!("Failed to write final chunk: {}", e))
397                })?;
398            self.xml_buffer.clear();
399        }
400
401        // Final flush
402        self.buffer_manager
403            .flush_all()
404            .map_err(|e| BuildError::XmlGeneration(format!("Failed to flush: {}", e)))?;
405
406        self.message_finished = true;
407
408        Ok(StreamingStats {
409            releases_written: self.releases_written,
410            resources_written: self.resources_written,
411            deals_written: self.deals_written,
412            bytes_written: self.buffer_manager.total_bytes_written(),
413            warnings: self.warnings.clone(),
414            peak_memory_usage: self.buffer_manager.peak_buffer_size(),
415        })
416    }
417
418    // Private helper methods
419
420    fn write_message_header(&mut self, header: &MessageHeaderRequest) -> Result<(), BuildError> {
421        // Generate message ID if not provided
422        let default_id = Uuid::new_v4().to_string();
423        let message_id = header.message_id.as_deref().unwrap_or(&default_id);
424
425        let mut header_xml = String::new();
426        header_xml.push_str("  <MessageHeader>\n");
427        header_xml.push_str(&format!(
428            "    <MessageId>{}</MessageId>\n",
429            escape_xml(message_id)
430        ));
431
432        // Write MessageSender
433        header_xml.push_str("    <MessageSender>\n");
434        if !header.message_sender.party_name.is_empty() {
435            header_xml.push_str(&format!(
436                "      <PartyName>{}</PartyName>\n",
437                escape_xml(&header.message_sender.party_name[0].text)
438            ));
439        }
440        header_xml.push_str("    </MessageSender>\n");
441
442        // Write MessageRecipient
443        header_xml.push_str("    <MessageRecipient>\n");
444        if !header.message_recipient.party_name.is_empty() {
445            header_xml.push_str(&format!(
446                "      <PartyName>{}</PartyName>\n",
447                escape_xml(&header.message_recipient.party_name[0].text)
448            ));
449        }
450        header_xml.push_str("    </MessageRecipient>\n");
451
452        // Write MessageCreatedDateTime
453        let default_time = chrono::Utc::now().to_rfc3339();
454        let created_time = header
455            .message_created_date_time
456            .as_deref()
457            .unwrap_or(&default_time);
458        header_xml.push_str(&format!(
459            "    <MessageCreatedDateTime>{}</MessageCreatedDateTime>\n",
460            escape_xml(created_time)
461        ));
462
463        header_xml.push_str("  </MessageHeader>\n");
464
465        self.xml_buffer.extend_from_slice(header_xml.as_bytes());
466        Ok(())
467    }
468
469    fn flush_if_needed(&mut self) -> Result<(), BuildError> {
470        // Check if XML buffer is getting large and flush it
471        if self.xml_buffer.len() >= self.config.max_buffer_size {
472            self.buffer_manager
473                .write_chunk(&self.xml_buffer)
474                .map_err(|e| {
475                    BuildError::XmlGeneration(format!("Failed to flush XML buffer: {}", e))
476                })?;
477            self.xml_buffer.clear();
478        }
479        Ok(())
480    }
481
482    fn report_progress(&self) {
483        if let Some(ref callback) = self.progress_callback {
484            let current_memory = self.xml_buffer.len() + self.buffer_manager.current_buffer_size();
485
486            let completion_percent = if let Some(total) = self.estimated_total_items {
487                Some(
488                    ((self.releases_written + self.resources_written) as f64 / total as f64)
489                        * 100.0,
490                )
491            } else {
492                None
493            };
494
495            let progress = StreamingProgress {
496                releases_written: self.releases_written,
497                resources_written: self.resources_written,
498                bytes_written: self.buffer_manager.total_bytes_written(),
499                current_memory_usage: current_memory,
500                estimated_completion_percent: completion_percent,
501            };
502
503            callback(progress);
504        }
505    }
506}
507
508/// Result of streaming build operation
509///
510/// Contains comprehensive statistics and metadata about a completed
511/// streaming DDEX XML build operation, including performance metrics
512/// and any warnings that were generated during the process.
513///
514/// # Example
515/// ```
516/// use ddex_builder::streaming::StreamingResult;
517///
518/// // After completing a streaming build
519/// let result = streaming_builder.finish_message()?;
520/// println!("Built {} releases with {} resources",
521///          result.releases_written, result.resources_written);
522/// println!("Generated {} bytes using {} peak memory",
523///          result.bytes_written, result.peak_memory_usage);
524///
525/// if !result.warnings.is_empty() {
526///     println!("Warnings: {:?}", result.warnings);
527/// }
528/// ```
529#[derive(Debug)]
530pub struct StreamingResult {
531    /// Total number of releases written to the stream
532    pub releases_written: usize,
533    /// Total number of resources (tracks/recordings) written to the stream
534    pub resources_written: usize,
535    /// Total number of deals written to the stream
536    pub deals_written: usize,
537    /// Total bytes written to the output stream
538    pub bytes_written: usize,
539    /// Any warnings generated during the streaming operation
540    pub warnings: Vec<BuildWarning>,
541    /// Peak memory usage in bytes during the streaming process
542    pub peak_memory_usage: usize,
543}
544
545/// Statistics from a completed streaming operation
546///
547/// Internal statistics structure used during streaming operations.
548/// Similar to StreamingResult but used for internal tracking.
549///
550/// # Example
551/// ```
552/// use ddex_builder::streaming::StreamingStats;
553///
554/// // Internal usage - returned by finish_message()
555/// let stats = StreamingStats {
556///     releases_written: 1000,
557///     resources_written: 15000,
558///     deals_written: 50,
559///     bytes_written: 25 * 1024 * 1024, // 25MB
560///     warnings: vec![],
561///     peak_memory_usage: 8 * 1024 * 1024, // 8MB peak
562/// };
563/// ```
564#[derive(Debug, Clone)]
565pub struct StreamingStats {
566    /// Number of releases successfully written
567    pub releases_written: usize,
568    /// Number of resources successfully written
569    pub resources_written: usize,
570    /// Number of deals successfully written
571    pub deals_written: usize,
572    /// Total bytes written to the output
573    pub bytes_written: usize,
574    /// List of warnings generated during streaming
575    pub warnings: Vec<BuildWarning>,
576    /// Peak memory usage observed during streaming
577    pub peak_memory_usage: usize,
578}
579
580/// Errors that can occur during streaming operations
581///
582/// Comprehensive error types for streaming DDEX XML generation,
583/// including state management errors, I/O issues, and XML formatting problems.
584///
585/// # Example
586/// ```
587/// use ddex_builder::streaming::{StreamingBuilder, StreamingError};
588///
589/// match streaming_builder.write_resource(/*...*/) {
590///     Ok(resource_ref) => println!("Resource written: {}", resource_ref),
591///     Err(StreamingError::InvalidState { message }) => {
592///         eprintln!("Invalid state: {}", message);
593///     }
594///     Err(StreamingError::IoError(io_err)) => {
595///         eprintln!("I/O error: {}", io_err);
596///     }
597///     Err(other_err) => eprintln!("Other error: {}", other_err),
598/// }
599/// ```
600#[derive(Debug, thiserror::Error)]
601pub enum StreamingError {
602    /// Invalid state transition (e.g., writing resources after finishing message)
603    #[error("Invalid state: {message}")]
604    InvalidState {
605        /// Description of the invalid state and what operation was attempted
606        message: String,
607    },
608
609    /// I/O error during streaming operations (writing to output, flushing buffers)
610    #[error("I/O error: {0}")]
611    IoError(#[from] std::io::Error),
612
613    /// XML writing or formatting error
614    #[error("XML writing error: {0}")]
615    XmlError(#[from] quick_xml::Error),
616
617    /// General build error from the underlying builder system
618    #[error("Build error: {0}")]
619    BuildError(#[from] BuildError),
620}
621
622impl From<StreamingError> for BuildError {
623    fn from(err: StreamingError) -> Self {
624        match err {
625            StreamingError::InvalidState { message } => BuildError::XmlGeneration(message),
626            StreamingError::IoError(e) => BuildError::XmlGeneration(format!("I/O error: {}", e)),
627            StreamingError::XmlError(e) => BuildError::XmlGeneration(format!("XML error: {}", e)),
628            StreamingError::BuildError(e) => e,
629        }
630    }
631}
632
633/// Escape XML special characters
634fn escape_xml(text: &str) -> String {
635    text.replace('&', "&amp;")
636        .replace('<', "&lt;")
637        .replace('>', "&gt;")
638        .replace('"', "&quot;")
639        .replace('\'', "&apos;")
640}