1pub mod buffer_manager;
8pub mod reference_manager;
9
10use crate::builder::MessageHeaderRequest;
11use crate::error::{BuildError, BuildWarning};
12use crate::determinism::DeterminismConfig;
13use buffer_manager::BufferManager;
14use reference_manager::StreamingReferenceManager;
15use std::io::Write as IoWrite;
16use uuid::Uuid;
17
18#[derive(Debug, Clone)]
20pub struct StreamingConfig {
21 pub max_buffer_size: usize,
23 pub deterministic: bool,
25 pub determinism_config: DeterminismConfig,
27 pub validate_during_stream: bool,
29 pub progress_callback_frequency: usize,
31}
32
33impl Default for StreamingConfig {
34 fn default() -> Self {
35 Self {
36 max_buffer_size: 10 * 1024 * 1024, deterministic: true,
38 determinism_config: DeterminismConfig::default(),
39 validate_during_stream: true,
40 progress_callback_frequency: 100,
41 }
42 }
43}
44
45#[derive(Debug, Clone)]
47pub struct StreamingProgress {
48 pub releases_written: usize,
49 pub resources_written: usize,
50 pub bytes_written: usize,
51 pub current_memory_usage: usize,
52 pub estimated_completion_percent: Option<f64>,
53}
54
55pub type ProgressCallback = Box<dyn Fn(StreamingProgress) + Send + Sync>;
57
58pub struct StreamingBuilder<W: IoWrite> {
60 buffer_manager: BufferManager<W>,
61 reference_manager: StreamingReferenceManager,
62 config: StreamingConfig,
63 xml_buffer: Vec<u8>,
64
65 message_started: bool,
67 message_finished: bool,
68 releases_written: usize,
69 resources_written: usize,
70 deals_written: usize,
71 warnings: Vec<BuildWarning>,
72
73 progress_callback: Option<ProgressCallback>,
75 estimated_total_items: Option<usize>,
76}
77
78impl<W: IoWrite> StreamingBuilder<W> {
79 pub fn new(writer: W) -> Result<Self, BuildError> {
81 Self::new_with_config(writer, StreamingConfig::default())
82 }
83
84 pub fn new_with_config(writer: W, config: StreamingConfig) -> Result<Self, BuildError> {
86 let buffer_manager = BufferManager::new(writer, config.max_buffer_size)
87 .map_err(|e| BuildError::XmlGeneration(format!("Failed to create buffer manager: {}", e)))?;
88
89 Ok(StreamingBuilder {
90 buffer_manager,
91 reference_manager: StreamingReferenceManager::new(),
92 config,
93 xml_buffer: Vec::new(),
94 message_started: false,
95 message_finished: false,
96 releases_written: 0,
97 resources_written: 0,
98 deals_written: 0,
99 warnings: Vec::new(),
100 progress_callback: None,
101 estimated_total_items: None,
102 })
103 }
104
105 pub fn set_progress_callback(&mut self, callback: ProgressCallback) {
107 self.progress_callback = Some(callback);
108 }
109
110 pub fn set_estimated_total(&mut self, total: usize) {
112 self.estimated_total_items = Some(total);
113 }
114
115 pub fn start_message(&mut self, header: &MessageHeaderRequest, version: &str) -> Result<(), BuildError> {
117 if self.message_started {
118 return Err(BuildError::XmlGeneration("Message already started".to_string()));
119 }
120
121 let xml_start = format!(
123 r#"<?xml version="1.0" encoding="UTF-8"?>
124<NewReleaseMessage xmlns="http://ddex.net/xml/ern/43" MessageSchemaVersionId="{}" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
125"#, version);
126
127 self.xml_buffer.extend_from_slice(xml_start.as_bytes());
128
129 self.write_message_header(header)?;
131
132 self.xml_buffer.extend_from_slice(b" <ResourceList>\n");
134
135 self.message_started = true;
136 self.flush_if_needed()?;
137
138 Ok(())
139 }
140
141 pub fn write_resource(&mut self,
143 resource_id: &str,
144 title: &str,
145 artist: &str,
146 isrc: Option<&str>,
147 duration: Option<&str>,
148 file_path: Option<&str>) -> Result<String, BuildError> {
149 if !self.message_started || self.message_finished {
150 return Err(BuildError::XmlGeneration("Message not in valid state for writing resources".to_string()));
151 }
152
153 let resource_ref = self.reference_manager.generate_resource_reference(resource_id)?;
155
156 let mut resource_xml = String::new();
158 resource_xml.push_str(" <SoundRecording>\n");
159 resource_xml.push_str(&format!(" <ResourceReference>{}</ResourceReference>\n", resource_ref));
160 resource_xml.push_str(" <Type>SoundRecording</Type>\n");
161 resource_xml.push_str(&format!(" <ResourceId>{}</ResourceId>\n", escape_xml(resource_id)));
162 resource_xml.push_str(&format!(" <ReferenceTitle>{}</ReferenceTitle>\n", escape_xml(title)));
163 resource_xml.push_str(&format!(" <DisplayArtist>{}</DisplayArtist>\n", escape_xml(artist)));
164
165 if let Some(isrc_value) = isrc {
166 resource_xml.push_str(&format!(" <ISRC>{}</ISRC>\n", escape_xml(isrc_value)));
167 }
168
169 if let Some(duration_value) = duration {
170 resource_xml.push_str(&format!(" <Duration>{}</Duration>\n", escape_xml(duration_value)));
171 }
172
173 if let Some(file_path_value) = file_path {
174 resource_xml.push_str(" <TechnicalResourceDetails>\n");
175 resource_xml.push_str(&format!(" <FileName>{}</FileName>\n", escape_xml(file_path_value)));
176 resource_xml.push_str(" <AudioCodecType>MP3</AudioCodecType>\n");
177 resource_xml.push_str(" </TechnicalResourceDetails>\n");
178 }
179
180 resource_xml.push_str(" </SoundRecording>\n");
181
182 self.xml_buffer.extend_from_slice(resource_xml.as_bytes());
183
184 self.resources_written += 1;
185
186 if self.resources_written % self.config.progress_callback_frequency == 0 {
188 self.report_progress();
189 }
190
191 self.flush_if_needed()?;
193
194 Ok(resource_ref)
195 }
196
197 pub fn finish_resources_start_releases(&mut self) -> Result<(), BuildError> {
199 if !self.message_started || self.message_finished {
200 return Err(BuildError::XmlGeneration("Message not in valid state".to_string()));
201 }
202
203 self.xml_buffer.extend_from_slice(b" </ResourceList>\n");
205 self.xml_buffer.extend_from_slice(b" <ReleaseList>\n");
206
207 self.flush_if_needed()?;
208 Ok(())
209 }
210
211 pub fn write_release(&mut self,
213 release_id: &str,
214 title: &str,
215 artist: &str,
216 label: Option<&str>,
217 upc: Option<&str>,
218 release_date: Option<&str>,
219 genre: Option<&str>,
220 resource_references: &[String]) -> Result<String, BuildError> {
221 if !self.message_started || self.message_finished {
222 return Err(BuildError::XmlGeneration("Message not in valid state for writing releases".to_string()));
223 }
224
225 let release_ref = self.reference_manager.generate_release_reference(release_id)?;
227
228 let mut release_xml = String::new();
230 release_xml.push_str(" <Release>\n");
231 release_xml.push_str(&format!(" <ReleaseReference>{}</ReleaseReference>\n", release_ref));
232 release_xml.push_str(&format!(" <ReleaseId>{}</ReleaseId>\n", escape_xml(release_id)));
233 release_xml.push_str(" <ReleaseType>Album</ReleaseType>\n");
234 release_xml.push_str(&format!(" <Title>{}</Title>\n", escape_xml(title)));
235 release_xml.push_str(&format!(" <DisplayArtist>{}</DisplayArtist>\n", escape_xml(artist)));
236
237 if let Some(label_value) = label {
238 release_xml.push_str(&format!(" <LabelName>{}</LabelName>\n", escape_xml(label_value)));
239 }
240
241 if let Some(upc_value) = upc {
242 release_xml.push_str(&format!(" <UPC>{}</UPC>\n", escape_xml(upc_value)));
243 }
244
245 if let Some(date_value) = release_date {
246 release_xml.push_str(&format!(" <ReleaseDate>{}</ReleaseDate>\n", escape_xml(date_value)));
247 }
248
249 if let Some(genre_value) = genre {
250 release_xml.push_str(&format!(" <Genre>{}</Genre>\n", escape_xml(genre_value)));
251 }
252
253 if !resource_references.is_empty() {
255 release_xml.push_str(" <ResourceGroup>\n");
256 for resource_ref in resource_references {
257 release_xml.push_str(&format!(" <ResourceReference>{}</ResourceReference>\n", resource_ref));
258 }
259 release_xml.push_str(" </ResourceGroup>\n");
260 }
261
262 release_xml.push_str(" </Release>\n");
263
264 self.xml_buffer.extend_from_slice(release_xml.as_bytes());
265
266 self.releases_written += 1;
267
268 if self.releases_written % self.config.progress_callback_frequency == 0 {
270 self.report_progress();
271 }
272
273 self.flush_if_needed()?;
275
276 Ok(release_ref)
277 }
278
279 pub fn finish_message(&mut self) -> Result<StreamingStats, BuildError> {
281 if !self.message_started || self.message_finished {
282 return Err(BuildError::XmlGeneration("Message not in valid state to finish".to_string()));
283 }
284
285 self.xml_buffer.extend_from_slice(b" </ReleaseList>\n");
287 self.xml_buffer.extend_from_slice(b"</NewReleaseMessage>\n");
288
289 if !self.xml_buffer.is_empty() {
291 self.buffer_manager.write_chunk(&self.xml_buffer)
292 .map_err(|e| BuildError::XmlGeneration(format!("Failed to write final chunk: {}", e)))?;
293 self.xml_buffer.clear();
294 }
295
296 self.buffer_manager.flush_all()
298 .map_err(|e| BuildError::XmlGeneration(format!("Failed to flush: {}", e)))?;
299
300 self.message_finished = true;
301
302 Ok(StreamingStats {
303 releases_written: self.releases_written,
304 resources_written: self.resources_written,
305 deals_written: self.deals_written,
306 bytes_written: self.buffer_manager.total_bytes_written(),
307 warnings: self.warnings.clone(),
308 peak_memory_usage: self.buffer_manager.peak_buffer_size(),
309 })
310 }
311
312 fn write_message_header(&mut self, header: &MessageHeaderRequest) -> Result<(), BuildError> {
315 let default_id = Uuid::new_v4().to_string();
317 let message_id = header.message_id.as_deref()
318 .unwrap_or(&default_id);
319
320 let mut header_xml = String::new();
321 header_xml.push_str(" <MessageHeader>\n");
322 header_xml.push_str(&format!(" <MessageId>{}</MessageId>\n", escape_xml(message_id)));
323
324 header_xml.push_str(" <MessageSender>\n");
326 if !header.message_sender.party_name.is_empty() {
327 header_xml.push_str(&format!(" <PartyName>{}</PartyName>\n",
328 escape_xml(&header.message_sender.party_name[0].text)));
329 }
330 header_xml.push_str(" </MessageSender>\n");
331
332 header_xml.push_str(" <MessageRecipient>\n");
334 if !header.message_recipient.party_name.is_empty() {
335 header_xml.push_str(&format!(" <PartyName>{}</PartyName>\n",
336 escape_xml(&header.message_recipient.party_name[0].text)));
337 }
338 header_xml.push_str(" </MessageRecipient>\n");
339
340 let default_time = chrono::Utc::now().to_rfc3339();
342 let created_time = header.message_created_date_time.as_deref()
343 .unwrap_or(&default_time);
344 header_xml.push_str(&format!(" <MessageCreatedDateTime>{}</MessageCreatedDateTime>\n",
345 escape_xml(created_time)));
346
347 header_xml.push_str(" </MessageHeader>\n");
348
349 self.xml_buffer.extend_from_slice(header_xml.as_bytes());
350 Ok(())
351 }
352
353 fn flush_if_needed(&mut self) -> Result<(), BuildError> {
354 if self.xml_buffer.len() >= self.config.max_buffer_size {
356 self.buffer_manager.write_chunk(&self.xml_buffer)
357 .map_err(|e| BuildError::XmlGeneration(format!("Failed to flush XML buffer: {}", e)))?;
358 self.xml_buffer.clear();
359 }
360 Ok(())
361 }
362
363 fn report_progress(&self) {
364 if let Some(ref callback) = self.progress_callback {
365 let current_memory = self.xml_buffer.len() +
366 self.buffer_manager.current_buffer_size();
367
368 let completion_percent = if let Some(total) = self.estimated_total_items {
369 Some(((self.releases_written + self.resources_written) as f64 / total as f64) * 100.0)
370 } else {
371 None
372 };
373
374 let progress = StreamingProgress {
375 releases_written: self.releases_written,
376 resources_written: self.resources_written,
377 bytes_written: self.buffer_manager.total_bytes_written(),
378 current_memory_usage: current_memory,
379 estimated_completion_percent: completion_percent,
380 };
381
382 callback(progress);
383 }
384 }
385}
386
387#[derive(Debug, Clone)]
389pub struct StreamingStats {
390 pub releases_written: usize,
391 pub resources_written: usize,
392 pub deals_written: usize,
393 pub bytes_written: usize,
394 pub warnings: Vec<BuildWarning>,
395 pub peak_memory_usage: usize,
396}
397
398#[derive(Debug, thiserror::Error)]
400pub enum StreamingError {
401 #[error("Invalid state: {message}")]
402 InvalidState { message: String },
403
404 #[error("I/O error: {0}")]
405 IoError(#[from] std::io::Error),
406
407 #[error("XML writing error: {0}")]
408 XmlError(#[from] quick_xml::Error),
409
410 #[error("Build error: {0}")]
411 BuildError(#[from] BuildError),
412}
413
414impl From<StreamingError> for BuildError {
415 fn from(err: StreamingError) -> Self {
416 match err {
417 StreamingError::InvalidState { message } => BuildError::XmlGeneration(message),
418 StreamingError::IoError(e) => BuildError::XmlGeneration(format!("I/O error: {}", e)),
419 StreamingError::XmlError(e) => BuildError::XmlGeneration(format!("XML error: {}", e)),
420 StreamingError::BuildError(e) => e,
421 }
422 }
423}
424
425fn escape_xml(text: &str) -> String {
427 text.replace('&', "&")
428 .replace('<', "<")
429 .replace('>', ">")
430 .replace('"', """)
431 .replace('\'', "'")
432}