ddex_builder/streaming/
reference_manager.rs

1//! Reference management for streaming DDEX XML generation
2//! 
3//! Manages stable reference generation and validation during streaming
4//! to ensure proper linking between releases and resources.
5
6use crate::error::BuildError;
7use crate::id_generator::{StableHashGenerator, StableHashConfig};
8use indexmap::{IndexMap, IndexSet};
9
10/// Configuration for reference management during streaming
11#[derive(Debug, Clone)]
12pub struct ReferenceConfig {
13    /// Use deterministic reference generation
14    pub deterministic: bool,
15    /// Prefix for resource references
16    pub resource_prefix: String,
17    /// Prefix for release references  
18    pub release_prefix: String,
19    /// Prefix for deal references
20    pub deal_prefix: String,
21    /// Maximum number of references to cache in memory
22    pub max_cache_size: usize,
23}
24
25impl Default for ReferenceConfig {
26    fn default() -> Self {
27        Self {
28            deterministic: true,
29            resource_prefix: "R".to_string(),
30            release_prefix: "REL".to_string(),
31            deal_prefix: "D".to_string(),
32            max_cache_size: 100_000, // Should handle large catalogs
33        }
34    }
35}
36
37/// Represents a resource reference with metadata
38#[derive(Debug, Clone)]
39pub struct ResourceReference {
40    pub reference_id: String,
41    pub resource_id: String,
42    pub title: String,
43    pub artist: String,
44    pub resource_type: String,
45    pub sequence_number: usize,
46}
47
48/// Represents a release reference with metadata
49#[derive(Debug, Clone)]
50pub struct ReleaseReference {
51    pub reference_id: String,
52    pub release_id: String,
53    pub title: String,
54    pub artist: String,
55    pub resource_references: Vec<String>,
56    pub sequence_number: usize,
57}
58
59/// Manages references during streaming operations
60pub struct StreamingReferenceManager {
61    config: ReferenceConfig,
62    hash_generator: StableHashGenerator,
63    
64    // Resource tracking
65    resource_references: IndexMap<String, String>, // resource_id -> reference_id
66    resource_metadata: IndexMap<String, ResourceReference>,
67    resource_sequence: usize,
68    
69    // Release tracking
70    release_references: IndexMap<String, String>, // release_id -> reference_id  
71    release_metadata: IndexMap<String, ReleaseReference>,
72    release_sequence: usize,
73    
74    // Deal tracking
75    deal_references: IndexMap<String, String>, // deal_id -> reference_id
76    deal_sequence: usize,
77    
78    // Validation tracking
79    used_references: IndexSet<String>,
80    orphaned_references: Vec<String>,
81    duplicate_resource_ids: IndexSet<String>,
82    duplicate_release_ids: IndexSet<String>,
83    
84    // Memory management
85    references_generated: usize,
86}
87
88impl StreamingReferenceManager {
89    /// Create a new streaming reference manager
90    pub fn new() -> Self {
91        Self::new_with_config(ReferenceConfig::default())
92    }
93    
94    /// Create a new streaming reference manager with custom configuration
95    pub fn new_with_config(config: ReferenceConfig) -> Self {
96        let hash_config = StableHashConfig::default();
97        
98        StreamingReferenceManager {
99            config,
100            hash_generator: StableHashGenerator::new(hash_config),
101            resource_references: IndexMap::new(),
102            resource_metadata: IndexMap::new(),
103            resource_sequence: 1,
104            release_references: IndexMap::new(),
105            release_metadata: IndexMap::new(),
106            release_sequence: 1,
107            deal_references: IndexMap::new(),
108            deal_sequence: 1,
109            used_references: IndexSet::new(),
110            orphaned_references: Vec::new(),
111            duplicate_resource_ids: IndexSet::new(),
112            duplicate_release_ids: IndexSet::new(),
113            references_generated: 0,
114        }
115    }
116    
117    /// Generate a stable reference for a resource
118    pub fn generate_resource_reference(&mut self, resource_id: &str) -> Result<String, BuildError> {
119        // Check for duplicate resource ID
120        if self.resource_references.contains_key(resource_id) {
121            self.duplicate_resource_ids.insert(resource_id.to_string());
122            return Ok(self.resource_references[resource_id].clone());
123        }
124        
125        // Generate stable reference
126        let reference_id = if self.config.deterministic {
127            // Use a simplified hash approach for now
128            use sha2::{Sha256, Digest};
129            let mut hasher = Sha256::new();
130            hasher.update(resource_id.as_bytes());
131            let hash = format!("{:x}", hasher.finalize());
132            format!("{}{}", self.config.resource_prefix, &hash[..8])
133        } else {
134            format!("{}{:06}", self.config.resource_prefix, self.resource_sequence)
135        };
136        
137        // Check for reference collision
138        if self.used_references.contains(&reference_id) {
139            return Err(BuildError::InvalidReference { 
140                reference: reference_id,
141            });
142        }
143        
144        // Store the mapping
145        self.resource_references.insert(resource_id.to_string(), reference_id.clone());
146        self.used_references.insert(reference_id.clone());
147        self.resource_sequence += 1;
148        self.references_generated += 1;
149        
150        // Manage memory usage
151        self.manage_cache_size()?;
152        
153        Ok(reference_id)
154    }
155    
156    /// Generate a stable reference for a release
157    pub fn generate_release_reference(&mut self, release_id: &str) -> Result<String, BuildError> {
158        // Check for duplicate release ID
159        if self.release_references.contains_key(release_id) {
160            self.duplicate_release_ids.insert(release_id.to_string());
161            return Ok(self.release_references[release_id].clone());
162        }
163        
164        // Generate stable reference
165        let reference_id = if self.config.deterministic {
166            // Use a simplified hash approach for now
167            use sha2::{Sha256, Digest};
168            let mut hasher = Sha256::new();
169            hasher.update(release_id.as_bytes());
170            let hash = format!("{:x}", hasher.finalize());
171            format!("{}{}", self.config.release_prefix, &hash[..8])
172        } else {
173            format!("{}{:06}", self.config.release_prefix, self.release_sequence)
174        };
175        
176        // Check for reference collision
177        if self.used_references.contains(&reference_id) {
178            return Err(BuildError::InvalidReference { 
179                reference: reference_id,
180            });
181        }
182        
183        // Store the mapping
184        self.release_references.insert(release_id.to_string(), reference_id.clone());
185        self.used_references.insert(reference_id.clone());
186        self.release_sequence += 1;
187        self.references_generated += 1;
188        
189        // Manage memory usage
190        self.manage_cache_size()?;
191        
192        Ok(reference_id)
193    }
194    
195    /// Generate a stable reference for a deal
196    pub fn generate_deal_reference(&mut self, deal_id: &str) -> Result<String, BuildError> {
197        // Check for existing mapping
198        if let Some(existing_ref) = self.deal_references.get(deal_id) {
199            return Ok(existing_ref.clone());
200        }
201        
202        // Generate stable reference
203        let reference_id = if self.config.deterministic {
204            // Use a simplified hash approach for now
205            use sha2::{Sha256, Digest};
206            let mut hasher = Sha256::new();
207            hasher.update(deal_id.as_bytes());
208            let hash = format!("{:x}", hasher.finalize());
209            format!("{}{}", self.config.deal_prefix, &hash[..8])
210        } else {
211            format!("{}{:06}", self.config.deal_prefix, self.deal_sequence)
212        };
213        
214        // Check for reference collision
215        if self.used_references.contains(&reference_id) {
216            return Err(BuildError::InvalidReference { 
217                reference: reference_id,
218            });
219        }
220        
221        // Store the mapping
222        self.deal_references.insert(deal_id.to_string(), reference_id.clone());
223        self.used_references.insert(reference_id.clone());
224        self.deal_sequence += 1;
225        self.references_generated += 1;
226        
227        // Manage memory usage
228        self.manage_cache_size()?;
229        
230        Ok(reference_id)
231    }
232    
233    /// Store metadata for a resource reference
234    pub fn store_resource_metadata(&mut self, 
235                                   resource_id: &str,
236                                   title: &str,
237                                   artist: &str,
238                                   resource_type: &str) -> Result<(), BuildError> {
239        let reference_id = self.resource_references.get(resource_id)
240            .ok_or_else(|| BuildError::InvalidReference { 
241                reference: format!("Resource {} not found", resource_id) 
242            })?
243            .clone();
244        
245        let metadata = ResourceReference {
246            reference_id: reference_id.clone(),
247            resource_id: resource_id.to_string(),
248            title: title.to_string(),
249            artist: artist.to_string(),
250            resource_type: resource_type.to_string(),
251            sequence_number: self.resource_metadata.len() + 1,
252        };
253        
254        self.resource_metadata.insert(reference_id, metadata);
255        Ok(())
256    }
257    
258    /// Store metadata for a release reference
259    pub fn store_release_metadata(&mut self,
260                                  release_id: &str,
261                                  title: &str, 
262                                  artist: &str,
263                                  resource_references: Vec<String>) -> Result<(), BuildError> {
264        let reference_id = self.release_references.get(release_id)
265            .ok_or_else(|| BuildError::InvalidReference { 
266                reference: format!("Release {} not found", release_id) 
267            })?
268            .clone();
269        
270        // Validate that all resource references exist
271        for resource_ref in &resource_references {
272            if !self.used_references.contains(resource_ref) {
273                self.orphaned_references.push(resource_ref.clone());
274            }
275        }
276        
277        let metadata = ReleaseReference {
278            reference_id: reference_id.clone(),
279            release_id: release_id.to_string(),
280            title: title.to_string(),
281            artist: artist.to_string(),
282            resource_references,
283            sequence_number: self.release_metadata.len() + 1,
284        };
285        
286        self.release_metadata.insert(reference_id, metadata);
287        Ok(())
288    }
289    
290    /// Validate all references at the end of streaming
291    pub fn validate_references(&self) -> ReferenceValidationResult {
292        let mut errors = Vec::new();
293        let mut warnings = Vec::new();
294        
295        // Check for orphaned references
296        if !self.orphaned_references.is_empty() {
297            warnings.push(format!("Found {} orphaned resource references", 
298                                 self.orphaned_references.len()));
299        }
300        
301        // Check for duplicate resource IDs
302        if !self.duplicate_resource_ids.is_empty() {
303            warnings.push(format!("Found {} duplicate resource IDs", 
304                                 self.duplicate_resource_ids.len()));
305        }
306        
307        // Check for duplicate release IDs
308        if !self.duplicate_release_ids.is_empty() {
309            warnings.push(format!("Found {} duplicate release IDs", 
310                                 self.duplicate_release_ids.len()));
311        }
312        
313        // Check reference consistency
314        for (resource_id, reference_id) in &self.resource_references {
315            if !self.used_references.contains(reference_id) {
316                errors.push(format!("Resource reference {} for resource {} not properly tracked", 
317                                   reference_id, resource_id));
318            }
319        }
320        
321        for (release_id, reference_id) in &self.release_references {
322            if !self.used_references.contains(reference_id) {
323                errors.push(format!("Release reference {} for release {} not properly tracked", 
324                                   reference_id, release_id));
325            }
326        }
327        
328        ReferenceValidationResult {
329            is_valid: errors.is_empty(),
330            errors,
331            warnings,
332            total_references: self.references_generated,
333            resource_count: self.resource_references.len(),
334            release_count: self.release_references.len(),
335            deal_count: self.deal_references.len(),
336        }
337    }
338    
339    /// Get statistics about reference management
340    pub fn get_stats(&self) -> ReferenceStats {
341        ReferenceStats {
342            total_references_generated: self.references_generated,
343            resource_references: self.resource_references.len(),
344            release_references: self.release_references.len(),
345            deal_references: self.deal_references.len(),
346            cache_size: self.current_cache_size(),
347            duplicate_resource_ids: self.duplicate_resource_ids.len(),
348            duplicate_release_ids: self.duplicate_release_ids.len(),
349            orphaned_references: self.orphaned_references.len(),
350        }
351    }
352    
353    /// Get a resource reference by resource ID
354    pub fn get_resource_reference(&self, resource_id: &str) -> Option<&str> {
355        self.resource_references.get(resource_id).map(|s| s.as_str())
356    }
357    
358    /// Get a release reference by release ID
359    pub fn get_release_reference(&self, release_id: &str) -> Option<&str> {
360        self.release_references.get(release_id).map(|s| s.as_str())
361    }
362    
363    /// Clear old references to manage memory usage
364    fn manage_cache_size(&mut self) -> Result<(), BuildError> {
365        let current_size = self.current_cache_size();
366        
367        if current_size > self.config.max_cache_size {
368            // Remove oldest 25% of entries to free up memory
369            let to_remove = current_size / 4;
370            
371            // Remove oldest resource references
372            let resource_to_remove = std::cmp::min(to_remove / 2, self.resource_references.len() / 2);
373            for _ in 0..resource_to_remove {
374                if let Some((_resource_id, reference_id)) = self.resource_references.shift_remove_index(0) {
375                    self.resource_metadata.remove(&reference_id);
376                    self.used_references.remove(&reference_id);
377                }
378            }
379            
380            // Remove oldest release references  
381            let release_to_remove = std::cmp::min(to_remove / 2, self.release_references.len() / 2);
382            for _ in 0..release_to_remove {
383                if let Some((_release_id, reference_id)) = self.release_references.shift_remove_index(0) {
384                    self.release_metadata.remove(&reference_id);
385                    self.used_references.remove(&reference_id);
386                }
387            }
388        }
389        
390        Ok(())
391    }
392    
393    fn current_cache_size(&self) -> usize {
394        self.resource_references.len() + 
395        self.release_references.len() + 
396        self.deal_references.len() + 
397        self.resource_metadata.len() + 
398        self.release_metadata.len()
399    }
400}
401
402impl Default for StreamingReferenceManager {
403    fn default() -> Self {
404        Self::new()
405    }
406}
407
408/// Result of reference validation
409#[derive(Debug, Clone)]
410pub struct ReferenceValidationResult {
411    pub is_valid: bool,
412    pub errors: Vec<String>,
413    pub warnings: Vec<String>,
414    pub total_references: usize,
415    pub resource_count: usize,
416    pub release_count: usize,
417    pub deal_count: usize,
418}
419
420/// Statistics about reference management
421#[derive(Debug, Clone)]
422pub struct ReferenceStats {
423    pub total_references_generated: usize,
424    pub resource_references: usize,
425    pub release_references: usize,
426    pub deal_references: usize,
427    pub cache_size: usize,
428    pub duplicate_resource_ids: usize,
429    pub duplicate_release_ids: usize,
430    pub orphaned_references: usize,
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436    
437    #[test]
438    fn test_resource_reference_generation() {
439        let mut manager = StreamingReferenceManager::new();
440        
441        let ref1 = manager.generate_resource_reference("resource1").unwrap();
442        let ref2 = manager.generate_resource_reference("resource2").unwrap();
443        
444        assert_ne!(ref1, ref2);
445        assert!(ref1.starts_with("R"));
446        assert!(ref2.starts_with("R"));
447        
448        // Test duplicate handling
449        let ref3 = manager.generate_resource_reference("resource1").unwrap();
450        assert_eq!(ref1, ref3);
451    }
452    
453    #[test]
454    fn test_release_reference_generation() {
455        let mut manager = StreamingReferenceManager::new();
456        
457        let ref1 = manager.generate_release_reference("release1").unwrap();
458        let ref2 = manager.generate_release_reference("release2").unwrap();
459        
460        assert_ne!(ref1, ref2);
461        assert!(ref1.starts_with("REL"));
462        assert!(ref2.starts_with("REL"));
463    }
464    
465    #[test]
466    fn test_metadata_storage() {
467        let mut manager = StreamingReferenceManager::new();
468        
469        let resource_ref = manager.generate_resource_reference("resource1").unwrap();
470        manager.store_resource_metadata("resource1", "Title", "Artist", "SoundRecording").unwrap();
471        
472        let metadata = manager.resource_metadata.get(&resource_ref).unwrap();
473        assert_eq!(metadata.title, "Title");
474        assert_eq!(metadata.artist, "Artist");
475    }
476    
477    #[test]
478    fn test_reference_validation() {
479        let mut manager = StreamingReferenceManager::new();
480        
481        // Add some resources and releases
482        let resource_ref = manager.generate_resource_reference("resource1").unwrap();
483        let release_ref = manager.generate_release_reference("release1").unwrap();
484        
485        // Store metadata with valid resource reference
486        manager.store_release_metadata("release1", "Album Title", "Artist", 
487                                      vec![resource_ref]).unwrap();
488        
489        let validation = manager.validate_references();
490        assert!(validation.is_valid);
491        assert_eq!(validation.resource_count, 1);
492        assert_eq!(validation.release_count, 1);
493    }
494    
495    #[test]
496    fn test_orphaned_references() {
497        let mut manager = StreamingReferenceManager::new();
498        
499        let release_ref = manager.generate_release_reference("release1").unwrap();
500        
501        // Store metadata with invalid resource reference
502        manager.store_release_metadata("release1", "Album Title", "Artist", 
503                                      vec!["R999999".to_string()]).unwrap();
504        
505        let validation = manager.validate_references();
506        assert!(!validation.warnings.is_empty());
507    }
508}