ddex_builder/streaming/
mod.rs

1//! Streaming DDEX XML builder for large catalogs
2//! 
3//! This module provides a memory-efficient streaming approach to building DDEX XML
4//! that can handle catalogs with thousands of releases without loading everything
5//! into memory at once.
6
7pub mod buffer_manager;
8pub mod reference_manager;
9
10use crate::builder::MessageHeaderRequest;
11use crate::error::{BuildError, BuildWarning};
12use crate::determinism::DeterminismConfig;
13use buffer_manager::BufferManager;
14use reference_manager::StreamingReferenceManager;
15use std::io::Write as IoWrite;
16use uuid::Uuid;
17
18/// Configuration for streaming builder
19#[derive(Debug, Clone)]
20pub struct StreamingConfig {
21    /// Maximum buffer size before automatic flush (default: 10MB)
22    pub max_buffer_size: usize,
23    /// Whether to use deterministic ordering
24    pub deterministic: bool,
25    /// Determinism configuration
26    pub determinism_config: DeterminismConfig,
27    /// Whether to validate while streaming
28    pub validate_during_stream: bool,
29    /// Progress callback frequency (every N items)
30    pub progress_callback_frequency: usize,
31}
32
33impl Default for StreamingConfig {
34    fn default() -> Self {
35        Self {
36            max_buffer_size: 10 * 1024 * 1024, // 10MB
37            deterministic: true,
38            determinism_config: DeterminismConfig::default(),
39            validate_during_stream: true,
40            progress_callback_frequency: 100,
41        }
42    }
43}
44
45/// Progress information for streaming operations
46#[derive(Debug, Clone)]
47pub struct StreamingProgress {
48    pub releases_written: usize,
49    pub resources_written: usize,
50    pub bytes_written: usize,
51    pub current_memory_usage: usize,
52    pub estimated_completion_percent: Option<f64>,
53}
54
55/// Callback type for progress updates
56pub type ProgressCallback = Box<dyn Fn(StreamingProgress) + Send + Sync>;
57
58/// Streaming DDEX XML builder
59pub struct StreamingBuilder<W: IoWrite> {
60    buffer_manager: BufferManager<W>,
61    reference_manager: StreamingReferenceManager,
62    config: StreamingConfig,
63    xml_buffer: Vec<u8>,
64    
65    // State tracking
66    message_started: bool,
67    message_finished: bool,
68    releases_written: usize,
69    resources_written: usize,
70    deals_written: usize,
71    warnings: Vec<BuildWarning>,
72    
73    // Progress tracking
74    progress_callback: Option<ProgressCallback>,
75    estimated_total_items: Option<usize>,
76}
77
78impl<W: IoWrite> StreamingBuilder<W> {
79    /// Create a new streaming builder with the given writer
80    pub fn new(writer: W) -> Result<Self, BuildError> {
81        Self::new_with_config(writer, StreamingConfig::default())
82    }
83    
84    /// Create a new streaming builder with custom configuration
85    pub fn new_with_config(writer: W, config: StreamingConfig) -> Result<Self, BuildError> {
86        let buffer_manager = BufferManager::new(writer, config.max_buffer_size)
87            .map_err(|e| BuildError::XmlGeneration(format!("Failed to create buffer manager: {}", e)))?;
88        
89        Ok(StreamingBuilder {
90            buffer_manager,
91            reference_manager: StreamingReferenceManager::new(),
92            config,
93            xml_buffer: Vec::new(),
94            message_started: false,
95            message_finished: false,
96            releases_written: 0,
97            resources_written: 0,
98            deals_written: 0,
99            warnings: Vec::new(),
100            progress_callback: None,
101            estimated_total_items: None,
102        })
103    }
104    
105    /// Set a progress callback function
106    pub fn set_progress_callback(&mut self, callback: ProgressCallback) {
107        self.progress_callback = Some(callback);
108    }
109    
110    /// Set estimated total number of items for progress calculation
111    pub fn set_estimated_total(&mut self, total: usize) {
112        self.estimated_total_items = Some(total);
113    }
114    
115    /// Start the DDEX message with header information
116    pub fn start_message(&mut self, header: &MessageHeaderRequest, version: &str) -> Result<(), BuildError> {
117        if self.message_started {
118            return Err(BuildError::XmlGeneration("Message already started".to_string()));
119        }
120        
121        // Write XML declaration and start of message
122        let xml_start = format!(
123            r#"<?xml version="1.0" encoding="UTF-8"?>
124<NewReleaseMessage xmlns="http://ddex.net/xml/ern/43" MessageSchemaVersionId="{}" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
125"#, version);
126        
127        self.xml_buffer.extend_from_slice(xml_start.as_bytes());
128        
129        // Write message header
130        self.write_message_header(header)?;
131        
132        // Start resource list container
133        self.xml_buffer.extend_from_slice(b"  <ResourceList>\n");
134        
135        self.message_started = true;
136        self.flush_if_needed()?;
137        
138        Ok(())
139    }
140    
141    /// Write a single resource to the stream
142    pub fn write_resource(&mut self, 
143                         resource_id: &str, 
144                         title: &str, 
145                         artist: &str,
146                         isrc: Option<&str>,
147                         duration: Option<&str>,
148                         file_path: Option<&str>) -> Result<String, BuildError> {
149        if !self.message_started || self.message_finished {
150            return Err(BuildError::XmlGeneration("Message not in valid state for writing resources".to_string()));
151        }
152        
153        // Generate stable reference for this resource
154        let resource_ref = self.reference_manager.generate_resource_reference(resource_id)?;
155        
156        // Build SoundRecording XML
157        let mut resource_xml = String::new();
158        resource_xml.push_str("    <SoundRecording>\n");
159        resource_xml.push_str(&format!("      <ResourceReference>{}</ResourceReference>\n", resource_ref));
160        resource_xml.push_str("      <Type>SoundRecording</Type>\n");
161        resource_xml.push_str(&format!("      <ResourceId>{}</ResourceId>\n", escape_xml(resource_id)));
162        resource_xml.push_str(&format!("      <ReferenceTitle>{}</ReferenceTitle>\n", escape_xml(title)));
163        resource_xml.push_str(&format!("      <DisplayArtist>{}</DisplayArtist>\n", escape_xml(artist)));
164        
165        if let Some(isrc_value) = isrc {
166            resource_xml.push_str(&format!("      <ISRC>{}</ISRC>\n", escape_xml(isrc_value)));
167        }
168        
169        if let Some(duration_value) = duration {
170            resource_xml.push_str(&format!("      <Duration>{}</Duration>\n", escape_xml(duration_value)));
171        }
172        
173        if let Some(file_path_value) = file_path {
174            resource_xml.push_str("      <TechnicalResourceDetails>\n");
175            resource_xml.push_str(&format!("        <FileName>{}</FileName>\n", escape_xml(file_path_value)));
176            resource_xml.push_str("        <AudioCodecType>MP3</AudioCodecType>\n");
177            resource_xml.push_str("      </TechnicalResourceDetails>\n");
178        }
179        
180        resource_xml.push_str("    </SoundRecording>\n");
181        
182        self.xml_buffer.extend_from_slice(resource_xml.as_bytes());
183        
184        self.resources_written += 1;
185        
186        // Check for progress callback
187        if self.resources_written % self.config.progress_callback_frequency == 0 {
188            self.report_progress();
189        }
190        
191        // Flush if buffer is getting large
192        self.flush_if_needed()?;
193        
194        Ok(resource_ref)
195    }
196    
197    /// Finish the resource section and start the release section
198    pub fn finish_resources_start_releases(&mut self) -> Result<(), BuildError> {
199        if !self.message_started || self.message_finished {
200            return Err(BuildError::XmlGeneration("Message not in valid state".to_string()));
201        }
202        
203        // End ResourceList and start ReleaseList
204        self.xml_buffer.extend_from_slice(b"  </ResourceList>\n");
205        self.xml_buffer.extend_from_slice(b"  <ReleaseList>\n");
206        
207        self.flush_if_needed()?;
208        Ok(())
209    }
210    
211    /// Write a single release to the stream
212    pub fn write_release(&mut self, 
213                        release_id: &str,
214                        title: &str, 
215                        artist: &str,
216                        label: Option<&str>,
217                        upc: Option<&str>,
218                        release_date: Option<&str>,
219                        genre: Option<&str>,
220                        resource_references: &[String]) -> Result<String, BuildError> {
221        if !self.message_started || self.message_finished {
222            return Err(BuildError::XmlGeneration("Message not in valid state for writing releases".to_string()));
223        }
224        
225        // Generate stable reference for this release
226        let release_ref = self.reference_manager.generate_release_reference(release_id)?;
227        
228        // Build Release XML
229        let mut release_xml = String::new();
230        release_xml.push_str("    <Release>\n");
231        release_xml.push_str(&format!("      <ReleaseReference>{}</ReleaseReference>\n", release_ref));
232        release_xml.push_str(&format!("      <ReleaseId>{}</ReleaseId>\n", escape_xml(release_id)));
233        release_xml.push_str("      <ReleaseType>Album</ReleaseType>\n");
234        release_xml.push_str(&format!("      <Title>{}</Title>\n", escape_xml(title)));
235        release_xml.push_str(&format!("      <DisplayArtist>{}</DisplayArtist>\n", escape_xml(artist)));
236        
237        if let Some(label_value) = label {
238            release_xml.push_str(&format!("      <LabelName>{}</LabelName>\n", escape_xml(label_value)));
239        }
240        
241        if let Some(upc_value) = upc {
242            release_xml.push_str(&format!("      <UPC>{}</UPC>\n", escape_xml(upc_value)));
243        }
244        
245        if let Some(date_value) = release_date {
246            release_xml.push_str(&format!("      <ReleaseDate>{}</ReleaseDate>\n", escape_xml(date_value)));
247        }
248        
249        if let Some(genre_value) = genre {
250            release_xml.push_str(&format!("      <Genre>{}</Genre>\n", escape_xml(genre_value)));
251        }
252        
253        // Write ResourceGroup linking to resources
254        if !resource_references.is_empty() {
255            release_xml.push_str("      <ResourceGroup>\n");
256            for resource_ref in resource_references {
257                release_xml.push_str(&format!("        <ResourceReference>{}</ResourceReference>\n", resource_ref));
258            }
259            release_xml.push_str("      </ResourceGroup>\n");
260        }
261        
262        release_xml.push_str("    </Release>\n");
263        
264        self.xml_buffer.extend_from_slice(release_xml.as_bytes());
265        
266        self.releases_written += 1;
267        
268        // Check for progress callback
269        if self.releases_written % self.config.progress_callback_frequency == 0 {
270            self.report_progress();
271        }
272        
273        // Flush if buffer is getting large
274        self.flush_if_needed()?;
275        
276        Ok(release_ref)
277    }
278    
279    /// Finish the message and close all tags
280    pub fn finish_message(&mut self) -> Result<StreamingStats, BuildError> {
281        if !self.message_started || self.message_finished {
282            return Err(BuildError::XmlGeneration("Message not in valid state to finish".to_string()));
283        }
284        
285        // End ReleaseList and close root element
286        self.xml_buffer.extend_from_slice(b"  </ReleaseList>\n");
287        self.xml_buffer.extend_from_slice(b"</NewReleaseMessage>\n");
288        
289        // Final flush of any remaining content
290        if !self.xml_buffer.is_empty() {
291            self.buffer_manager.write_chunk(&self.xml_buffer)
292                .map_err(|e| BuildError::XmlGeneration(format!("Failed to write final chunk: {}", e)))?;
293            self.xml_buffer.clear();
294        }
295        
296        // Final flush
297        self.buffer_manager.flush_all()
298            .map_err(|e| BuildError::XmlGeneration(format!("Failed to flush: {}", e)))?;
299        
300        self.message_finished = true;
301        
302        Ok(StreamingStats {
303            releases_written: self.releases_written,
304            resources_written: self.resources_written,
305            deals_written: self.deals_written,
306            bytes_written: self.buffer_manager.total_bytes_written(),
307            warnings: self.warnings.clone(),
308            peak_memory_usage: self.buffer_manager.peak_buffer_size(),
309        })
310    }
311    
312    // Private helper methods
313    
314    fn write_message_header(&mut self, header: &MessageHeaderRequest) -> Result<(), BuildError> {
315        // Generate message ID if not provided
316        let default_id = Uuid::new_v4().to_string();
317        let message_id = header.message_id.as_deref()
318            .unwrap_or(&default_id);
319        
320        let mut header_xml = String::new();
321        header_xml.push_str("  <MessageHeader>\n");
322        header_xml.push_str(&format!("    <MessageId>{}</MessageId>\n", escape_xml(message_id)));
323        
324        // Write MessageSender
325        header_xml.push_str("    <MessageSender>\n");
326        if !header.message_sender.party_name.is_empty() {
327            header_xml.push_str(&format!("      <PartyName>{}</PartyName>\n", 
328                               escape_xml(&header.message_sender.party_name[0].text)));
329        }
330        header_xml.push_str("    </MessageSender>\n");
331        
332        // Write MessageRecipient
333        header_xml.push_str("    <MessageRecipient>\n");
334        if !header.message_recipient.party_name.is_empty() {
335            header_xml.push_str(&format!("      <PartyName>{}</PartyName>\n", 
336                               escape_xml(&header.message_recipient.party_name[0].text)));
337        }
338        header_xml.push_str("    </MessageRecipient>\n");
339        
340        // Write MessageCreatedDateTime
341        let default_time = chrono::Utc::now().to_rfc3339();
342        let created_time = header.message_created_date_time.as_deref()
343            .unwrap_or(&default_time);
344        header_xml.push_str(&format!("    <MessageCreatedDateTime>{}</MessageCreatedDateTime>\n", 
345                           escape_xml(created_time)));
346        
347        header_xml.push_str("  </MessageHeader>\n");
348        
349        self.xml_buffer.extend_from_slice(header_xml.as_bytes());
350        Ok(())
351    }
352    
353    fn flush_if_needed(&mut self) -> Result<(), BuildError> {
354        // Check if XML buffer is getting large and flush it
355        if self.xml_buffer.len() >= self.config.max_buffer_size {
356            self.buffer_manager.write_chunk(&self.xml_buffer)
357                .map_err(|e| BuildError::XmlGeneration(format!("Failed to flush XML buffer: {}", e)))?;
358            self.xml_buffer.clear();
359        }
360        Ok(())
361    }
362    
363    fn report_progress(&self) {
364        if let Some(ref callback) = self.progress_callback {
365            let current_memory = self.xml_buffer.len() + 
366                                self.buffer_manager.current_buffer_size();
367            
368            let completion_percent = if let Some(total) = self.estimated_total_items {
369                Some(((self.releases_written + self.resources_written) as f64 / total as f64) * 100.0)
370            } else {
371                None
372            };
373            
374            let progress = StreamingProgress {
375                releases_written: self.releases_written,
376                resources_written: self.resources_written,
377                bytes_written: self.buffer_manager.total_bytes_written(),
378                current_memory_usage: current_memory,
379                estimated_completion_percent: completion_percent,
380            };
381            
382            callback(progress);
383        }
384    }
385}
386
387/// Statistics from a completed streaming operation
388#[derive(Debug, Clone)]
389pub struct StreamingStats {
390    pub releases_written: usize,
391    pub resources_written: usize,
392    pub deals_written: usize,
393    pub bytes_written: usize,
394    pub warnings: Vec<BuildWarning>,
395    pub peak_memory_usage: usize,
396}
397
398/// Custom error type for streaming operations
399#[derive(Debug, thiserror::Error)]
400pub enum StreamingError {
401    #[error("Invalid state: {message}")]
402    InvalidState { message: String },
403    
404    #[error("I/O error: {0}")]
405    IoError(#[from] std::io::Error),
406    
407    #[error("XML writing error: {0}")]
408    XmlError(#[from] quick_xml::Error),
409    
410    #[error("Build error: {0}")]
411    BuildError(#[from] BuildError),
412}
413
414impl From<StreamingError> for BuildError {
415    fn from(err: StreamingError) -> Self {
416        match err {
417            StreamingError::InvalidState { message } => BuildError::XmlGeneration(message),
418            StreamingError::IoError(e) => BuildError::XmlGeneration(format!("I/O error: {}", e)),
419            StreamingError::XmlError(e) => BuildError::XmlGeneration(format!("XML error: {}", e)),
420            StreamingError::BuildError(e) => e,
421        }
422    }
423}
424
425/// Escape XML special characters
426fn escape_xml(text: &str) -> String {
427    text.replace('&', "&amp;")
428        .replace('<', "&lt;")
429        .replace('>', "&gt;")
430        .replace('"', "&quot;")
431        .replace('\'', "&apos;")
432}