1pub mod buffer_manager;
8pub mod reference_manager;
9
10use crate::builder::MessageHeaderRequest;
11use crate::determinism::DeterminismConfig;
12use crate::error::{BuildError, BuildWarning};
13use buffer_manager::BufferManager;
14use reference_manager::StreamingReferenceManager;
15use std::io::Write as IoWrite;
16use uuid::Uuid;
17
18#[derive(Debug, Clone)]
37pub struct StreamingConfig {
38 pub max_buffer_size: usize,
40 pub deterministic: bool,
42 pub determinism_config: DeterminismConfig,
44 pub validate_during_stream: bool,
46 pub progress_callback_frequency: usize,
48}
49
50impl Default for StreamingConfig {
51 fn default() -> Self {
52 Self {
53 max_buffer_size: 10 * 1024 * 1024, deterministic: true,
55 determinism_config: DeterminismConfig::default(),
56 validate_during_stream: true,
57 progress_callback_frequency: 100,
58 }
59 }
60}
61
62#[derive(Debug, Clone)]
80pub struct StreamingProgress {
81 pub releases_written: usize,
83 pub resources_written: usize,
85 pub bytes_written: usize,
87 pub current_memory_usage: usize,
89 pub estimated_completion_percent: Option<f64>,
91}
92
93pub type ProgressCallback = Box<dyn Fn(StreamingProgress) + Send + Sync>;
95
96pub struct StreamingBuilder<W: IoWrite> {
98 buffer_manager: BufferManager<W>,
99 reference_manager: StreamingReferenceManager,
100 config: StreamingConfig,
101 xml_buffer: Vec<u8>,
102
103 message_started: bool,
105 message_finished: bool,
106 releases_written: usize,
107 resources_written: usize,
108 deals_written: usize,
109 warnings: Vec<BuildWarning>,
110
111 progress_callback: Option<ProgressCallback>,
113 estimated_total_items: Option<usize>,
114}
115
116impl<W: IoWrite> StreamingBuilder<W> {
117 pub fn new(writer: W) -> Result<Self, BuildError> {
119 Self::new_with_config(writer, StreamingConfig::default())
120 }
121
122 pub fn new_with_config(writer: W, config: StreamingConfig) -> Result<Self, BuildError> {
124 let buffer_manager = BufferManager::new(writer, config.max_buffer_size).map_err(|e| {
125 BuildError::XmlGeneration(format!("Failed to create buffer manager: {}", e))
126 })?;
127
128 Ok(StreamingBuilder {
129 buffer_manager,
130 reference_manager: StreamingReferenceManager::new(),
131 config,
132 xml_buffer: Vec::new(),
133 message_started: false,
134 message_finished: false,
135 releases_written: 0,
136 resources_written: 0,
137 deals_written: 0,
138 warnings: Vec::new(),
139 progress_callback: None,
140 estimated_total_items: None,
141 })
142 }
143
144 pub fn set_progress_callback(&mut self, callback: ProgressCallback) {
146 self.progress_callback = Some(callback);
147 }
148
149 pub fn set_estimated_total(&mut self, total: usize) {
151 self.estimated_total_items = Some(total);
152 }
153
154 pub fn start_message(
156 &mut self,
157 header: &MessageHeaderRequest,
158 version: &str,
159 ) -> Result<(), BuildError> {
160 if self.message_started {
161 return Err(BuildError::XmlGeneration(
162 "Message already started".to_string(),
163 ));
164 }
165
166 let xml_start = format!(
168 r#"<?xml version="1.0" encoding="UTF-8"?>
169<NewReleaseMessage xmlns="http://ddex.net/xml/ern/43" MessageSchemaVersionId="{}" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
170"#,
171 version
172 );
173
174 self.xml_buffer.extend_from_slice(xml_start.as_bytes());
175
176 self.write_message_header(header)?;
178
179 self.xml_buffer.extend_from_slice(b" <ResourceList>\n");
181
182 self.message_started = true;
183 self.flush_if_needed()?;
184
185 Ok(())
186 }
187
188 pub fn write_resource(
190 &mut self,
191 resource_id: &str,
192 title: &str,
193 artist: &str,
194 isrc: Option<&str>,
195 duration: Option<&str>,
196 file_path: Option<&str>,
197 ) -> Result<String, BuildError> {
198 if !self.message_started || self.message_finished {
199 return Err(BuildError::XmlGeneration(
200 "Message not in valid state for writing resources".to_string(),
201 ));
202 }
203
204 let resource_ref = self
206 .reference_manager
207 .generate_resource_reference(resource_id)?;
208
209 let mut resource_xml = String::new();
211 resource_xml.push_str(" <SoundRecording>\n");
212 resource_xml.push_str(&format!(
213 " <ResourceReference>{}</ResourceReference>\n",
214 resource_ref
215 ));
216 resource_xml.push_str(" <Type>SoundRecording</Type>\n");
217 resource_xml.push_str(&format!(
218 " <ResourceId>{}</ResourceId>\n",
219 escape_xml(resource_id)
220 ));
221 resource_xml.push_str(&format!(
222 " <ReferenceTitle>{}</ReferenceTitle>\n",
223 escape_xml(title)
224 ));
225 resource_xml.push_str(&format!(
226 " <DisplayArtist>{}</DisplayArtist>\n",
227 escape_xml(artist)
228 ));
229
230 if let Some(isrc_value) = isrc {
231 resource_xml.push_str(&format!(" <ISRC>{}</ISRC>\n", escape_xml(isrc_value)));
232 }
233
234 if let Some(duration_value) = duration {
235 resource_xml.push_str(&format!(
236 " <Duration>{}</Duration>\n",
237 escape_xml(duration_value)
238 ));
239 }
240
241 if let Some(file_path_value) = file_path {
242 resource_xml.push_str(" <TechnicalResourceDetails>\n");
243 resource_xml.push_str(&format!(
244 " <FileName>{}</FileName>\n",
245 escape_xml(file_path_value)
246 ));
247 resource_xml.push_str(" <AudioCodecType>MP3</AudioCodecType>\n");
248 resource_xml.push_str(" </TechnicalResourceDetails>\n");
249 }
250
251 resource_xml.push_str(" </SoundRecording>\n");
252
253 self.xml_buffer.extend_from_slice(resource_xml.as_bytes());
254
255 self.resources_written += 1;
256
257 if self.resources_written % self.config.progress_callback_frequency == 0 {
259 self.report_progress();
260 }
261
262 self.flush_if_needed()?;
264
265 Ok(resource_ref)
266 }
267
268 pub fn finish_resources_start_releases(&mut self) -> Result<(), BuildError> {
270 if !self.message_started || self.message_finished {
271 return Err(BuildError::XmlGeneration(
272 "Message not in valid state".to_string(),
273 ));
274 }
275
276 self.xml_buffer.extend_from_slice(b" </ResourceList>\n");
278 self.xml_buffer.extend_from_slice(b" <ReleaseList>\n");
279
280 self.flush_if_needed()?;
281 Ok(())
282 }
283
284 pub fn write_release(
286 &mut self,
287 release_id: &str,
288 title: &str,
289 artist: &str,
290 label: Option<&str>,
291 upc: Option<&str>,
292 release_date: Option<&str>,
293 genre: Option<&str>,
294 resource_references: &[String],
295 ) -> Result<String, BuildError> {
296 if !self.message_started || self.message_finished {
297 return Err(BuildError::XmlGeneration(
298 "Message not in valid state for writing releases".to_string(),
299 ));
300 }
301
302 let release_ref = self
304 .reference_manager
305 .generate_release_reference(release_id)?;
306
307 let mut release_xml = String::new();
309 release_xml.push_str(" <Release>\n");
310 release_xml.push_str(&format!(
311 " <ReleaseReference>{}</ReleaseReference>\n",
312 release_ref
313 ));
314 release_xml.push_str(&format!(
315 " <ReleaseId>{}</ReleaseId>\n",
316 escape_xml(release_id)
317 ));
318 release_xml.push_str(" <ReleaseType>Album</ReleaseType>\n");
319 release_xml.push_str(&format!(" <Title>{}</Title>\n", escape_xml(title)));
320 release_xml.push_str(&format!(
321 " <DisplayArtist>{}</DisplayArtist>\n",
322 escape_xml(artist)
323 ));
324
325 if let Some(label_value) = label {
326 release_xml.push_str(&format!(
327 " <LabelName>{}</LabelName>\n",
328 escape_xml(label_value)
329 ));
330 }
331
332 if let Some(upc_value) = upc {
333 release_xml.push_str(&format!(" <UPC>{}</UPC>\n", escape_xml(upc_value)));
334 }
335
336 if let Some(date_value) = release_date {
337 release_xml.push_str(&format!(
338 " <ReleaseDate>{}</ReleaseDate>\n",
339 escape_xml(date_value)
340 ));
341 }
342
343 if let Some(genre_value) = genre {
344 release_xml.push_str(&format!(
345 " <Genre>{}</Genre>\n",
346 escape_xml(genre_value)
347 ));
348 }
349
350 if !resource_references.is_empty() {
352 release_xml.push_str(" <ResourceGroup>\n");
353 for resource_ref in resource_references {
354 release_xml.push_str(&format!(
355 " <ResourceReference>{}</ResourceReference>\n",
356 resource_ref
357 ));
358 }
359 release_xml.push_str(" </ResourceGroup>\n");
360 }
361
362 release_xml.push_str(" </Release>\n");
363
364 self.xml_buffer.extend_from_slice(release_xml.as_bytes());
365
366 self.releases_written += 1;
367
368 if self.releases_written % self.config.progress_callback_frequency == 0 {
370 self.report_progress();
371 }
372
373 self.flush_if_needed()?;
375
376 Ok(release_ref)
377 }
378
379 pub fn finish_message(&mut self) -> Result<StreamingStats, BuildError> {
381 if !self.message_started || self.message_finished {
382 return Err(BuildError::XmlGeneration(
383 "Message not in valid state to finish".to_string(),
384 ));
385 }
386
387 self.xml_buffer.extend_from_slice(b" </ReleaseList>\n");
389 self.xml_buffer.extend_from_slice(b"</NewReleaseMessage>\n");
390
391 if !self.xml_buffer.is_empty() {
393 self.buffer_manager
394 .write_chunk(&self.xml_buffer)
395 .map_err(|e| {
396 BuildError::XmlGeneration(format!("Failed to write final chunk: {}", e))
397 })?;
398 self.xml_buffer.clear();
399 }
400
401 self.buffer_manager
403 .flush_all()
404 .map_err(|e| BuildError::XmlGeneration(format!("Failed to flush: {}", e)))?;
405
406 self.message_finished = true;
407
408 Ok(StreamingStats {
409 releases_written: self.releases_written,
410 resources_written: self.resources_written,
411 deals_written: self.deals_written,
412 bytes_written: self.buffer_manager.total_bytes_written(),
413 warnings: self.warnings.clone(),
414 peak_memory_usage: self.buffer_manager.peak_buffer_size(),
415 })
416 }
417
418 fn write_message_header(&mut self, header: &MessageHeaderRequest) -> Result<(), BuildError> {
421 let default_id = Uuid::new_v4().to_string();
423 let message_id = header.message_id.as_deref().unwrap_or(&default_id);
424
425 let mut header_xml = String::new();
426 header_xml.push_str(" <MessageHeader>\n");
427 header_xml.push_str(&format!(
428 " <MessageId>{}</MessageId>\n",
429 escape_xml(message_id)
430 ));
431
432 header_xml.push_str(" <MessageSender>\n");
434 if !header.message_sender.party_name.is_empty() {
435 header_xml.push_str(&format!(
436 " <PartyName>{}</PartyName>\n",
437 escape_xml(&header.message_sender.party_name[0].text)
438 ));
439 }
440 header_xml.push_str(" </MessageSender>\n");
441
442 header_xml.push_str(" <MessageRecipient>\n");
444 if !header.message_recipient.party_name.is_empty() {
445 header_xml.push_str(&format!(
446 " <PartyName>{}</PartyName>\n",
447 escape_xml(&header.message_recipient.party_name[0].text)
448 ));
449 }
450 header_xml.push_str(" </MessageRecipient>\n");
451
452 let default_time = chrono::Utc::now().to_rfc3339();
454 let created_time = header
455 .message_created_date_time
456 .as_deref()
457 .unwrap_or(&default_time);
458 header_xml.push_str(&format!(
459 " <MessageCreatedDateTime>{}</MessageCreatedDateTime>\n",
460 escape_xml(created_time)
461 ));
462
463 header_xml.push_str(" </MessageHeader>\n");
464
465 self.xml_buffer.extend_from_slice(header_xml.as_bytes());
466 Ok(())
467 }
468
469 fn flush_if_needed(&mut self) -> Result<(), BuildError> {
470 if self.xml_buffer.len() >= self.config.max_buffer_size {
472 self.buffer_manager
473 .write_chunk(&self.xml_buffer)
474 .map_err(|e| {
475 BuildError::XmlGeneration(format!("Failed to flush XML buffer: {}", e))
476 })?;
477 self.xml_buffer.clear();
478 }
479 Ok(())
480 }
481
482 fn report_progress(&self) {
483 if let Some(ref callback) = self.progress_callback {
484 let current_memory = self.xml_buffer.len() + self.buffer_manager.current_buffer_size();
485
486 let completion_percent = if let Some(total) = self.estimated_total_items {
487 Some(
488 ((self.releases_written + self.resources_written) as f64 / total as f64)
489 * 100.0,
490 )
491 } else {
492 None
493 };
494
495 let progress = StreamingProgress {
496 releases_written: self.releases_written,
497 resources_written: self.resources_written,
498 bytes_written: self.buffer_manager.total_bytes_written(),
499 current_memory_usage: current_memory,
500 estimated_completion_percent: completion_percent,
501 };
502
503 callback(progress);
504 }
505 }
506}
507
508#[derive(Debug)]
530pub struct StreamingResult {
531 pub releases_written: usize,
533 pub resources_written: usize,
535 pub deals_written: usize,
537 pub bytes_written: usize,
539 pub warnings: Vec<BuildWarning>,
541 pub peak_memory_usage: usize,
543}
544
545#[derive(Debug, Clone)]
565pub struct StreamingStats {
566 pub releases_written: usize,
568 pub resources_written: usize,
570 pub deals_written: usize,
572 pub bytes_written: usize,
574 pub warnings: Vec<BuildWarning>,
576 pub peak_memory_usage: usize,
578}
579
580#[derive(Debug, thiserror::Error)]
601pub enum StreamingError {
602 #[error("Invalid state: {message}")]
604 InvalidState {
605 message: String,
607 },
608
609 #[error("I/O error: {0}")]
611 IoError(#[from] std::io::Error),
612
613 #[error("XML writing error: {0}")]
615 XmlError(#[from] quick_xml::Error),
616
617 #[error("Build error: {0}")]
619 BuildError(#[from] BuildError),
620}
621
622impl From<StreamingError> for BuildError {
623 fn from(err: StreamingError) -> Self {
624 match err {
625 StreamingError::InvalidState { message } => BuildError::XmlGeneration(message),
626 StreamingError::IoError(e) => BuildError::XmlGeneration(format!("I/O error: {}", e)),
627 StreamingError::XmlError(e) => BuildError::XmlGeneration(format!("XML error: {}", e)),
628 StreamingError::BuildError(e) => e,
629 }
630 }
631}
632
633fn escape_xml(text: &str) -> String {
635 text.replace('&', "&")
636 .replace('<', "<")
637 .replace('>', ">")
638 .replace('"', """)
639 .replace('\'', "'")
640}