1pub mod buffer_manager;
8pub mod reference_manager;
9
10use crate::builder::MessageHeaderRequest;
11use crate::determinism::DeterminismConfig;
12use crate::error::{BuildError, BuildWarning};
13use buffer_manager::BufferManager;
14use reference_manager::StreamingReferenceManager;
15use std::io::Write as IoWrite;
16use uuid::Uuid;
17
18#[derive(Debug, Clone)]
37pub struct StreamingConfig {
38    pub max_buffer_size: usize,
40    pub deterministic: bool,
42    pub determinism_config: DeterminismConfig,
44    pub validate_during_stream: bool,
46    pub progress_callback_frequency: usize,
48}
49
50impl Default for StreamingConfig {
51    fn default() -> Self {
52        Self {
53            max_buffer_size: 10 * 1024 * 1024, deterministic: true,
55            determinism_config: DeterminismConfig::default(),
56            validate_during_stream: true,
57            progress_callback_frequency: 100,
58        }
59    }
60}
61
62#[derive(Debug, Clone)]
80pub struct StreamingProgress {
81    pub releases_written: usize,
83    pub resources_written: usize,
85    pub bytes_written: usize,
87    pub current_memory_usage: usize,
89    pub estimated_completion_percent: Option<f64>,
91}
92
93pub type ProgressCallback = Box<dyn Fn(StreamingProgress) + Send + Sync>;
95
96pub struct StreamingBuilder<W: IoWrite> {
98    buffer_manager: BufferManager<W>,
99    reference_manager: StreamingReferenceManager,
100    config: StreamingConfig,
101    xml_buffer: Vec<u8>,
102
103    message_started: bool,
105    message_finished: bool,
106    releases_written: usize,
107    resources_written: usize,
108    deals_written: usize,
109    warnings: Vec<BuildWarning>,
110
111    progress_callback: Option<ProgressCallback>,
113    estimated_total_items: Option<usize>,
114}
115
116impl<W: IoWrite> StreamingBuilder<W> {
117    pub fn new(writer: W) -> Result<Self, BuildError> {
119        Self::new_with_config(writer, StreamingConfig::default())
120    }
121
122    pub fn new_with_config(writer: W, config: StreamingConfig) -> Result<Self, BuildError> {
124        let buffer_manager = BufferManager::new(writer, config.max_buffer_size).map_err(|e| {
125            BuildError::XmlGeneration(format!("Failed to create buffer manager: {}", e))
126        })?;
127
128        Ok(StreamingBuilder {
129            buffer_manager,
130            reference_manager: StreamingReferenceManager::new(),
131            config,
132            xml_buffer: Vec::new(),
133            message_started: false,
134            message_finished: false,
135            releases_written: 0,
136            resources_written: 0,
137            deals_written: 0,
138            warnings: Vec::new(),
139            progress_callback: None,
140            estimated_total_items: None,
141        })
142    }
143
144    pub fn set_progress_callback(&mut self, callback: ProgressCallback) {
146        self.progress_callback = Some(callback);
147    }
148
149    pub fn set_estimated_total(&mut self, total: usize) {
151        self.estimated_total_items = Some(total);
152    }
153
154    pub fn start_message(
156        &mut self,
157        header: &MessageHeaderRequest,
158        version: &str,
159    ) -> Result<(), BuildError> {
160        if self.message_started {
161            return Err(BuildError::XmlGeneration(
162                "Message already started".to_string(),
163            ));
164        }
165
166        let xml_start = format!(
168            r#"<?xml version="1.0" encoding="UTF-8"?>
169<NewReleaseMessage xmlns="http://ddex.net/xml/ern/43" MessageSchemaVersionId="{}" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
170"#,
171            version
172        );
173
174        self.xml_buffer.extend_from_slice(xml_start.as_bytes());
175
176        self.write_message_header(header)?;
178
179        self.xml_buffer.extend_from_slice(b"  <ResourceList>\n");
181
182        self.message_started = true;
183        self.flush_if_needed()?;
184
185        Ok(())
186    }
187
188    pub fn write_resource(
190        &mut self,
191        resource_id: &str,
192        title: &str,
193        artist: &str,
194        isrc: Option<&str>,
195        duration: Option<&str>,
196        file_path: Option<&str>,
197    ) -> Result<String, BuildError> {
198        if !self.message_started || self.message_finished {
199            return Err(BuildError::XmlGeneration(
200                "Message not in valid state for writing resources".to_string(),
201            ));
202        }
203
204        let resource_ref = self
206            .reference_manager
207            .generate_resource_reference(resource_id)?;
208
209        let mut resource_xml = String::new();
211        resource_xml.push_str("    <SoundRecording>\n");
212        resource_xml.push_str(&format!(
213            "      <ResourceReference>{}</ResourceReference>\n",
214            resource_ref
215        ));
216        resource_xml.push_str("      <Type>SoundRecording</Type>\n");
217        resource_xml.push_str(&format!(
218            "      <ResourceId>{}</ResourceId>\n",
219            escape_xml(resource_id)
220        ));
221        resource_xml.push_str(&format!(
222            "      <ReferenceTitle>{}</ReferenceTitle>\n",
223            escape_xml(title)
224        ));
225        resource_xml.push_str(&format!(
226            "      <DisplayArtist>{}</DisplayArtist>\n",
227            escape_xml(artist)
228        ));
229
230        if let Some(isrc_value) = isrc {
231            resource_xml.push_str(&format!("      <ISRC>{}</ISRC>\n", escape_xml(isrc_value)));
232        }
233
234        if let Some(duration_value) = duration {
235            resource_xml.push_str(&format!(
236                "      <Duration>{}</Duration>\n",
237                escape_xml(duration_value)
238            ));
239        }
240
241        if let Some(file_path_value) = file_path {
242            resource_xml.push_str("      <TechnicalResourceDetails>\n");
243            resource_xml.push_str(&format!(
244                "        <FileName>{}</FileName>\n",
245                escape_xml(file_path_value)
246            ));
247            resource_xml.push_str("        <AudioCodecType>MP3</AudioCodecType>\n");
248            resource_xml.push_str("      </TechnicalResourceDetails>\n");
249        }
250
251        resource_xml.push_str("    </SoundRecording>\n");
252
253        self.xml_buffer.extend_from_slice(resource_xml.as_bytes());
254
255        self.resources_written += 1;
256
257        if self.resources_written % self.config.progress_callback_frequency == 0 {
259            self.report_progress();
260        }
261
262        self.flush_if_needed()?;
264
265        Ok(resource_ref)
266    }
267
268    pub fn finish_resources_start_releases(&mut self) -> Result<(), BuildError> {
270        if !self.message_started || self.message_finished {
271            return Err(BuildError::XmlGeneration(
272                "Message not in valid state".to_string(),
273            ));
274        }
275
276        self.xml_buffer.extend_from_slice(b"  </ResourceList>\n");
278        self.xml_buffer.extend_from_slice(b"  <ReleaseList>\n");
279
280        self.flush_if_needed()?;
281        Ok(())
282    }
283
284    pub fn write_release(
286        &mut self,
287        release_id: &str,
288        title: &str,
289        artist: &str,
290        label: Option<&str>,
291        upc: Option<&str>,
292        release_date: Option<&str>,
293        genre: Option<&str>,
294        resource_references: &[String],
295    ) -> Result<String, BuildError> {
296        if !self.message_started || self.message_finished {
297            return Err(BuildError::XmlGeneration(
298                "Message not in valid state for writing releases".to_string(),
299            ));
300        }
301
302        let release_ref = self
304            .reference_manager
305            .generate_release_reference(release_id)?;
306
307        let mut release_xml = String::new();
309        release_xml.push_str("    <Release>\n");
310        release_xml.push_str(&format!(
311            "      <ReleaseReference>{}</ReleaseReference>\n",
312            release_ref
313        ));
314        release_xml.push_str(&format!(
315            "      <ReleaseId>{}</ReleaseId>\n",
316            escape_xml(release_id)
317        ));
318        release_xml.push_str("      <ReleaseType>Album</ReleaseType>\n");
319        release_xml.push_str(&format!("      <Title>{}</Title>\n", escape_xml(title)));
320        release_xml.push_str(&format!(
321            "      <DisplayArtist>{}</DisplayArtist>\n",
322            escape_xml(artist)
323        ));
324
325        if let Some(label_value) = label {
326            release_xml.push_str(&format!(
327                "      <LabelName>{}</LabelName>\n",
328                escape_xml(label_value)
329            ));
330        }
331
332        if let Some(upc_value) = upc {
333            release_xml.push_str(&format!("      <UPC>{}</UPC>\n", escape_xml(upc_value)));
334        }
335
336        if let Some(date_value) = release_date {
337            release_xml.push_str(&format!(
338                "      <ReleaseDate>{}</ReleaseDate>\n",
339                escape_xml(date_value)
340            ));
341        }
342
343        if let Some(genre_value) = genre {
344            release_xml.push_str(&format!(
345                "      <Genre>{}</Genre>\n",
346                escape_xml(genre_value)
347            ));
348        }
349
350        if !resource_references.is_empty() {
352            release_xml.push_str("      <ResourceGroup>\n");
353            for resource_ref in resource_references {
354                release_xml.push_str(&format!(
355                    "        <ResourceReference>{}</ResourceReference>\n",
356                    resource_ref
357                ));
358            }
359            release_xml.push_str("      </ResourceGroup>\n");
360        }
361
362        release_xml.push_str("    </Release>\n");
363
364        self.xml_buffer.extend_from_slice(release_xml.as_bytes());
365
366        self.releases_written += 1;
367
368        if self.releases_written % self.config.progress_callback_frequency == 0 {
370            self.report_progress();
371        }
372
373        self.flush_if_needed()?;
375
376        Ok(release_ref)
377    }
378
379    pub fn finish_message(&mut self) -> Result<StreamingStats, BuildError> {
381        if !self.message_started || self.message_finished {
382            return Err(BuildError::XmlGeneration(
383                "Message not in valid state to finish".to_string(),
384            ));
385        }
386
387        self.xml_buffer.extend_from_slice(b"  </ReleaseList>\n");
389        self.xml_buffer.extend_from_slice(b"</NewReleaseMessage>\n");
390
391        if !self.xml_buffer.is_empty() {
393            self.buffer_manager
394                .write_chunk(&self.xml_buffer)
395                .map_err(|e| {
396                    BuildError::XmlGeneration(format!("Failed to write final chunk: {}", e))
397                })?;
398            self.xml_buffer.clear();
399        }
400
401        self.buffer_manager
403            .flush_all()
404            .map_err(|e| BuildError::XmlGeneration(format!("Failed to flush: {}", e)))?;
405
406        self.message_finished = true;
407
408        Ok(StreamingStats {
409            releases_written: self.releases_written,
410            resources_written: self.resources_written,
411            deals_written: self.deals_written,
412            bytes_written: self.buffer_manager.total_bytes_written(),
413            warnings: self.warnings.clone(),
414            peak_memory_usage: self.buffer_manager.peak_buffer_size(),
415        })
416    }
417
418    fn write_message_header(&mut self, header: &MessageHeaderRequest) -> Result<(), BuildError> {
421        let default_id = Uuid::new_v4().to_string();
423        let message_id = header.message_id.as_deref().unwrap_or(&default_id);
424
425        let mut header_xml = String::new();
426        header_xml.push_str("  <MessageHeader>\n");
427        header_xml.push_str(&format!(
428            "    <MessageId>{}</MessageId>\n",
429            escape_xml(message_id)
430        ));
431
432        header_xml.push_str("    <MessageSender>\n");
434        if !header.message_sender.party_name.is_empty() {
435            header_xml.push_str(&format!(
436                "      <PartyName>{}</PartyName>\n",
437                escape_xml(&header.message_sender.party_name[0].text)
438            ));
439        }
440        header_xml.push_str("    </MessageSender>\n");
441
442        header_xml.push_str("    <MessageRecipient>\n");
444        if !header.message_recipient.party_name.is_empty() {
445            header_xml.push_str(&format!(
446                "      <PartyName>{}</PartyName>\n",
447                escape_xml(&header.message_recipient.party_name[0].text)
448            ));
449        }
450        header_xml.push_str("    </MessageRecipient>\n");
451
452        let default_time = chrono::Utc::now().to_rfc3339();
454        let created_time = header
455            .message_created_date_time
456            .as_deref()
457            .unwrap_or(&default_time);
458        header_xml.push_str(&format!(
459            "    <MessageCreatedDateTime>{}</MessageCreatedDateTime>\n",
460            escape_xml(created_time)
461        ));
462
463        header_xml.push_str("  </MessageHeader>\n");
464
465        self.xml_buffer.extend_from_slice(header_xml.as_bytes());
466        Ok(())
467    }
468
469    fn flush_if_needed(&mut self) -> Result<(), BuildError> {
470        if self.xml_buffer.len() >= self.config.max_buffer_size {
472            self.buffer_manager
473                .write_chunk(&self.xml_buffer)
474                .map_err(|e| {
475                    BuildError::XmlGeneration(format!("Failed to flush XML buffer: {}", e))
476                })?;
477            self.xml_buffer.clear();
478        }
479        Ok(())
480    }
481
482    fn report_progress(&self) {
483        if let Some(ref callback) = self.progress_callback {
484            let current_memory = self.xml_buffer.len() + self.buffer_manager.current_buffer_size();
485
486            let completion_percent = if let Some(total) = self.estimated_total_items {
487                Some(
488                    ((self.releases_written + self.resources_written) as f64 / total as f64)
489                        * 100.0,
490                )
491            } else {
492                None
493            };
494
495            let progress = StreamingProgress {
496                releases_written: self.releases_written,
497                resources_written: self.resources_written,
498                bytes_written: self.buffer_manager.total_bytes_written(),
499                current_memory_usage: current_memory,
500                estimated_completion_percent: completion_percent,
501            };
502
503            callback(progress);
504        }
505    }
506}
507
508#[derive(Debug)]
530pub struct StreamingResult {
531    pub releases_written: usize,
533    pub resources_written: usize,
535    pub deals_written: usize,
537    pub bytes_written: usize,
539    pub warnings: Vec<BuildWarning>,
541    pub peak_memory_usage: usize,
543}
544
545#[derive(Debug, Clone)]
565pub struct StreamingStats {
566    pub releases_written: usize,
568    pub resources_written: usize,
570    pub deals_written: usize,
572    pub bytes_written: usize,
574    pub warnings: Vec<BuildWarning>,
576    pub peak_memory_usage: usize,
578}
579
580#[derive(Debug, thiserror::Error)]
601pub enum StreamingError {
602    #[error("Invalid state: {message}")]
604    InvalidState {
605        message: String,
607    },
608
609    #[error("I/O error: {0}")]
611    IoError(#[from] std::io::Error),
612
613    #[error("XML writing error: {0}")]
615    XmlError(#[from] quick_xml::Error),
616
617    #[error("Build error: {0}")]
619    BuildError(#[from] BuildError),
620}
621
622impl From<StreamingError> for BuildError {
623    fn from(err: StreamingError) -> Self {
624        match err {
625            StreamingError::InvalidState { message } => BuildError::XmlGeneration(message),
626            StreamingError::IoError(e) => BuildError::XmlGeneration(format!("I/O error: {}", e)),
627            StreamingError::XmlError(e) => BuildError::XmlGeneration(format!("XML error: {}", e)),
628            StreamingError::BuildError(e) => e,
629        }
630    }
631}
632
633fn escape_xml(text: &str) -> String {
635    text.replace('&', "&")
636        .replace('<', "<")
637        .replace('>', ">")
638        .replace('"', """)
639        .replace('\'', "'")
640}