Skip to main content

apiary_core/
registry_manager.rs

1//! Registry manager for DDL operations and persistence.
2//!
3//! The registry manager handles creating, reading, and updating the registry
4//! using conditional writes for atomic updates.
5
6use crate::{
7    error::ApiaryError,
8    registry::{Box as ApiaryBox, Frame, Hive, Registry},
9    storage::StorageBackend,
10    Result,
11};
12use bytes::Bytes;
13use std::sync::Arc;
14use tracing::{debug, info, warn};
15
16/// Path prefix for registry files in object storage.
17const REGISTRY_PREFIX: &str = "_registry";
18
19/// Format for registry state file names.
20fn registry_state_key(version: u64) -> String {
21    format!("{}/state_{:06}.json", REGISTRY_PREFIX, version)
22}
23
24/// Registry manager for DDL operations.
25pub struct RegistryManager {
26    storage: Arc<dyn StorageBackend>,
27}
28
29impl RegistryManager {
30    /// Create a new registry manager.
31    pub fn new(storage: Arc<dyn StorageBackend>) -> Self {
32        Self { storage }
33    }
34
35    /// Load the latest registry from storage, or create a new one if none exists.
36    pub async fn load_or_create(&self) -> Result<Registry> {
37        // List all registry state files
38        let keys = self.storage.list(REGISTRY_PREFIX).await?;
39
40        if keys.is_empty() {
41            info!("No registry found, creating new registry");
42            return self.create_initial_registry().await;
43        }
44
45        // Find the latest version
46        let mut max_version = 0u64;
47        for key in &keys {
48            if let Some(version) = self.extract_version(key) {
49                max_version = max_version.max(version);
50            }
51        }
52
53        if max_version == 0 {
54            warn!("No valid registry files found, creating new registry");
55            return self.create_initial_registry().await;
56        }
57
58        info!("Loading registry version {}", max_version);
59        self.load_version(max_version).await
60    }
61
62    /// Load a specific version of the registry.
63    pub async fn load_version(&self, version: u64) -> Result<Registry> {
64        let key = registry_state_key(version);
65        let data = self.storage.get(&key).await?;
66        let registry: Registry = serde_json::from_slice(&data).map_err(|e| {
67            ApiaryError::Serialization(format!("Failed to deserialize registry: {}", e))
68        })?;
69
70        debug!(
71            "Loaded registry version {} with {} hives",
72            registry.version,
73            registry.hives.len()
74        );
75        Ok(registry)
76    }
77
78    /// Create a hive in the registry.
79    pub async fn create_hive(&self, hive_name: &str) -> Result<Registry> {
80        let mut retry_count = 0;
81        const MAX_RETRIES: u32 = 10;
82
83        loop {
84            // Load the current registry
85            let mut registry = self.load_or_create().await?;
86
87            // Check if hive already exists (idempotency)
88            if registry.has_hive(hive_name) {
89                info!("Hive '{}' already exists", hive_name);
90                return Ok(registry);
91            }
92
93            // Add the new hive
94            let hive = Hive::new();
95            registry.hives.insert(hive_name.to_string(), hive);
96            registry.version = registry.next_version();
97
98            // Try to write the new version
99            match self.try_commit_registry(&registry).await {
100                Ok(true) => {
101                    info!(
102                        "Created hive '{}' at registry version {}",
103                        hive_name, registry.version
104                    );
105                    return Ok(registry);
106                }
107                Ok(false) => {
108                    // Conflict - another writer won, retry
109                    retry_count += 1;
110                    if retry_count >= MAX_RETRIES {
111                        return Err(ApiaryError::Conflict {
112                            message: format!("Failed to create hive after {} retries", MAX_RETRIES),
113                        });
114                    }
115                    warn!(
116                        "Conflict creating hive '{}', retrying ({}/{})",
117                        hive_name, retry_count, MAX_RETRIES
118                    );
119                    continue;
120                }
121                Err(e) => return Err(e),
122            }
123        }
124    }
125
126    /// Create a box within a hive.
127    pub async fn create_box(&self, hive_name: &str, box_name: &str) -> Result<Registry> {
128        let mut retry_count = 0;
129        const MAX_RETRIES: u32 = 10;
130
131        loop {
132            // Load the current registry
133            let mut registry = self.load_or_create().await?;
134
135            // Check if hive exists
136            if !registry.has_hive(hive_name) {
137                return Err(ApiaryError::EntityNotFound {
138                    entity_type: "Hive".to_string(),
139                    name: hive_name.to_string(),
140                });
141            }
142
143            // Check if box already exists (idempotency)
144            if registry.has_box(hive_name, box_name) {
145                info!("Box '{}.{}' already exists", hive_name, box_name);
146                return Ok(registry);
147            }
148
149            // Add the new box
150            let box_ = ApiaryBox::new();
151            registry
152                .get_hive_mut(hive_name)
153                .unwrap()
154                .boxes
155                .insert(box_name.to_string(), box_);
156            registry.version = registry.next_version();
157
158            // Try to write the new version
159            match self.try_commit_registry(&registry).await {
160                Ok(true) => {
161                    info!(
162                        "Created box '{}.{}' at registry version {}",
163                        hive_name, box_name, registry.version
164                    );
165                    return Ok(registry);
166                }
167                Ok(false) => {
168                    // Conflict - another writer won, retry
169                    retry_count += 1;
170                    if retry_count >= MAX_RETRIES {
171                        return Err(ApiaryError::Conflict {
172                            message: format!("Failed to create box after {} retries", MAX_RETRIES),
173                        });
174                    }
175                    warn!(
176                        "Conflict creating box '{}.{}', retrying ({}/{})",
177                        hive_name, box_name, retry_count, MAX_RETRIES
178                    );
179                    continue;
180                }
181                Err(e) => return Err(e),
182            }
183        }
184    }
185
186    /// Create a frame within a box.
187    pub async fn create_frame(
188        &self,
189        hive_name: &str,
190        box_name: &str,
191        frame_name: &str,
192        schema: serde_json::Value,
193        partition_by: Vec<String>,
194    ) -> Result<Registry> {
195        let mut retry_count = 0;
196        const MAX_RETRIES: u32 = 10;
197
198        loop {
199            // Load the current registry
200            let mut registry = self.load_or_create().await?;
201
202            // Check if hive and box exist
203            if !registry.has_hive(hive_name) {
204                return Err(ApiaryError::EntityNotFound {
205                    entity_type: "Hive".to_string(),
206                    name: hive_name.to_string(),
207                });
208            }
209            if !registry.has_box(hive_name, box_name) {
210                return Err(ApiaryError::EntityNotFound {
211                    entity_type: "Box".to_string(),
212                    name: format!("{}.{}", hive_name, box_name),
213                });
214            }
215
216            // Check if frame already exists (idempotency)
217            if registry.has_frame(hive_name, box_name, frame_name) {
218                info!(
219                    "Frame '{}.{}.{}' already exists",
220                    hive_name, box_name, frame_name
221                );
222                return Ok(registry);
223            }
224
225            // Add the new frame
226            let frame = if partition_by.is_empty() {
227                Frame::new(schema.clone())
228            } else {
229                Frame::with_partitioning(schema.clone(), partition_by.clone())
230            };
231
232            registry
233                .get_hive_mut(hive_name)
234                .unwrap()
235                .boxes
236                .get_mut(box_name)
237                .unwrap()
238                .frames
239                .insert(frame_name.to_string(), frame);
240            registry.version = registry.next_version();
241
242            // Try to write the new version
243            match self.try_commit_registry(&registry).await {
244                Ok(true) => {
245                    info!(
246                        "Created frame '{}.{}.{}' at registry version {}",
247                        hive_name, box_name, frame_name, registry.version
248                    );
249                    return Ok(registry);
250                }
251                Ok(false) => {
252                    // Conflict - another writer won, retry
253                    retry_count += 1;
254                    if retry_count >= MAX_RETRIES {
255                        return Err(ApiaryError::Conflict {
256                            message: format!(
257                                "Failed to create frame after {} retries",
258                                MAX_RETRIES
259                            ),
260                        });
261                    }
262                    warn!(
263                        "Conflict creating frame '{}.{}.{}', retrying ({}/{})",
264                        hive_name, box_name, frame_name, retry_count, MAX_RETRIES
265                    );
266                    continue;
267                }
268                Err(e) => return Err(e),
269            }
270        }
271    }
272
273    /// List all hives.
274    pub async fn list_hives(&self) -> Result<Vec<String>> {
275        let registry = self.load_or_create().await?;
276        let mut hives: Vec<String> = registry.hives.keys().cloned().collect();
277        hives.sort();
278        Ok(hives)
279    }
280
281    /// List all boxes in a hive.
282    pub async fn list_boxes(&self, hive_name: &str) -> Result<Vec<String>> {
283        let registry = self.load_or_create().await?;
284
285        let hive = registry
286            .get_hive(hive_name)
287            .ok_or_else(|| ApiaryError::EntityNotFound {
288                entity_type: "Hive".to_string(),
289                name: hive_name.to_string(),
290            })?;
291
292        let mut boxes: Vec<String> = hive.boxes.keys().cloned().collect();
293        boxes.sort();
294        Ok(boxes)
295    }
296
297    /// List all frames in a box.
298    pub async fn list_frames(&self, hive_name: &str, box_name: &str) -> Result<Vec<String>> {
299        let registry = self.load_or_create().await?;
300
301        let hive = registry
302            .get_hive(hive_name)
303            .ok_or_else(|| ApiaryError::EntityNotFound {
304                entity_type: "Hive".to_string(),
305                name: hive_name.to_string(),
306            })?;
307
308        let box_ = hive
309            .boxes
310            .get(box_name)
311            .ok_or_else(|| ApiaryError::EntityNotFound {
312                entity_type: "Box".to_string(),
313                name: format!("{}.{}", hive_name, box_name),
314            })?;
315
316        let mut frames: Vec<String> = box_.frames.keys().cloned().collect();
317        frames.sort();
318        Ok(frames)
319    }
320
321    /// Get a frame's metadata.
322    pub async fn get_frame(
323        &self,
324        hive_name: &str,
325        box_name: &str,
326        frame_name: &str,
327    ) -> Result<Frame> {
328        let registry = self.load_or_create().await?;
329
330        registry
331            .get_frame(hive_name, box_name, frame_name)
332            .cloned()
333            .ok_or_else(|| ApiaryError::EntityNotFound {
334                entity_type: "Frame".to_string(),
335                name: format!("{}.{}.{}", hive_name, box_name, frame_name),
336            })
337    }
338
339    // Private helper methods
340
341    /// Create the initial registry and persist it.
342    async fn create_initial_registry(&self) -> Result<Registry> {
343        let registry = Registry::new();
344
345        match self.try_commit_registry(&registry).await {
346            Ok(true) => {
347                info!("Created initial registry version 1");
348                Ok(registry)
349            }
350            Ok(false) => {
351                // Someone else created it first, load it
352                info!("Initial registry already created by another node");
353                self.load_version(1).await
354            }
355            Err(e) => Err(e),
356        }
357    }
358
359    /// Try to commit a registry using conditional write.
360    /// Returns Ok(true) if successful, Ok(false) if key already exists.
361    async fn try_commit_registry(&self, registry: &Registry) -> Result<bool> {
362        let key = registry_state_key(registry.version);
363        let json = serde_json::to_string_pretty(registry).map_err(|e| {
364            ApiaryError::Serialization(format!("Failed to serialize registry: {}", e))
365        })?;
366
367        let data = Bytes::from(json);
368        self.storage.put_if_not_exists(&key, data).await
369    }
370
371    /// Extract version number from a registry state key.
372    fn extract_version(&self, key: &str) -> Option<u64> {
373        // Expected format: _registry/state_000001.json
374        key.strip_prefix(&format!("{}/state_", REGISTRY_PREFIX))?
375            .strip_suffix(".json")?
376            .parse::<u64>()
377            .ok()
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384    use crate::storage::StorageBackend;
385    use async_trait::async_trait;
386    use std::collections::HashMap;
387    use std::sync::Mutex;
388
389    // Simple in-memory storage backend for testing
390    struct MemoryBackend {
391        data: Mutex<HashMap<String, Bytes>>,
392    }
393
394    impl MemoryBackend {
395        fn new() -> Self {
396            Self {
397                data: Mutex::new(HashMap::new()),
398            }
399        }
400    }
401
402    #[async_trait]
403    impl StorageBackend for MemoryBackend {
404        async fn put(&self, key: &str, data: Bytes) -> Result<()> {
405            self.data.lock().unwrap().insert(key.to_string(), data);
406            Ok(())
407        }
408
409        async fn get(&self, key: &str) -> Result<Bytes> {
410            self.data
411                .lock()
412                .unwrap()
413                .get(key)
414                .cloned()
415                .ok_or_else(|| ApiaryError::NotFound {
416                    key: key.to_string(),
417                })
418        }
419
420        async fn list(&self, prefix: &str) -> Result<Vec<String>> {
421            let data = self.data.lock().unwrap();
422            Ok(data
423                .keys()
424                .filter(|k| k.starts_with(prefix))
425                .cloned()
426                .collect())
427        }
428
429        async fn delete(&self, key: &str) -> Result<()> {
430            self.data.lock().unwrap().remove(key);
431            Ok(())
432        }
433
434        async fn put_if_not_exists(&self, key: &str, data: Bytes) -> Result<bool> {
435            let mut map = self.data.lock().unwrap();
436            if map.contains_key(key) {
437                Ok(false)
438            } else {
439                map.insert(key.to_string(), data);
440                Ok(true)
441            }
442        }
443
444        async fn exists(&self, key: &str) -> Result<bool> {
445            Ok(self.data.lock().unwrap().contains_key(key))
446        }
447    }
448
449    #[tokio::test]
450    async fn test_create_initial_registry() {
451        let storage = Arc::new(MemoryBackend::new());
452        let manager = RegistryManager::new(storage);
453
454        let registry = manager.load_or_create().await.unwrap();
455        assert_eq!(registry.version, 1);
456        assert_eq!(registry.hives.len(), 0);
457    }
458
459    #[tokio::test]
460    async fn test_create_hive() {
461        let storage = Arc::new(MemoryBackend::new());
462        let manager = RegistryManager::new(storage);
463
464        let registry = manager.create_hive("analytics").await.unwrap();
465        assert_eq!(registry.version, 2);
466        assert!(registry.has_hive("analytics"));
467    }
468
469    #[tokio::test]
470    async fn test_create_hive_idempotent() {
471        let storage = Arc::new(MemoryBackend::new());
472        let manager = RegistryManager::new(storage);
473
474        let registry1 = manager.create_hive("analytics").await.unwrap();
475        let registry2 = manager.create_hive("analytics").await.unwrap();
476
477        // Should not increment version on second call
478        assert_eq!(registry1.version, registry2.version);
479    }
480
481    #[tokio::test]
482    async fn test_create_box() {
483        let storage = Arc::new(MemoryBackend::new());
484        let manager = RegistryManager::new(storage);
485
486        manager.create_hive("analytics").await.unwrap();
487        let registry = manager.create_box("analytics", "sensors").await.unwrap();
488
489        assert!(registry.has_box("analytics", "sensors"));
490    }
491
492    #[tokio::test]
493    async fn test_create_box_hive_not_found() {
494        let storage = Arc::new(MemoryBackend::new());
495        let manager = RegistryManager::new(storage);
496
497        let result = manager.create_box("analytics", "sensors").await;
498        assert!(result.is_err());
499        assert!(matches!(
500            result.unwrap_err(),
501            ApiaryError::EntityNotFound { .. }
502        ));
503    }
504
505    #[tokio::test]
506    async fn test_create_frame() {
507        let storage = Arc::new(MemoryBackend::new());
508        let manager = RegistryManager::new(storage);
509
510        manager.create_hive("analytics").await.unwrap();
511        manager.create_box("analytics", "sensors").await.unwrap();
512
513        let schema = serde_json::json!({
514            "fields": [
515                {"name": "temperature", "type": "float"},
516                {"name": "timestamp", "type": "timestamp"}
517            ]
518        });
519
520        let registry = manager
521            .create_frame("analytics", "sensors", "temperature", schema, vec![])
522            .await
523            .unwrap();
524
525        assert!(registry.has_frame("analytics", "sensors", "temperature"));
526    }
527
528    #[tokio::test]
529    async fn test_create_frame_with_partitioning() {
530        let storage = Arc::new(MemoryBackend::new());
531        let manager = RegistryManager::new(storage);
532
533        manager.create_hive("analytics").await.unwrap();
534        manager.create_box("analytics", "sensors").await.unwrap();
535
536        let schema = serde_json::json!({"fields": []});
537        let partition_by = vec!["region".to_string(), "date".to_string()];
538
539        manager
540            .create_frame(
541                "analytics",
542                "sensors",
543                "temperature",
544                schema,
545                partition_by.clone(),
546            )
547            .await
548            .unwrap();
549
550        let frame = manager
551            .get_frame("analytics", "sensors", "temperature")
552            .await
553            .unwrap();
554
555        assert_eq!(frame.partition_by, partition_by);
556    }
557
558    #[tokio::test]
559    async fn test_list_hives() {
560        let storage = Arc::new(MemoryBackend::new());
561        let manager = RegistryManager::new(storage);
562
563        manager.create_hive("analytics").await.unwrap();
564        manager.create_hive("production").await.unwrap();
565
566        let hives = manager.list_hives().await.unwrap();
567        assert_eq!(hives, vec!["analytics", "production"]);
568    }
569
570    #[tokio::test]
571    async fn test_list_boxes() {
572        let storage = Arc::new(MemoryBackend::new());
573        let manager = RegistryManager::new(storage);
574
575        manager.create_hive("analytics").await.unwrap();
576        manager.create_box("analytics", "sensors").await.unwrap();
577        manager.create_box("analytics", "metrics").await.unwrap();
578
579        let boxes = manager.list_boxes("analytics").await.unwrap();
580        assert_eq!(boxes, vec!["metrics", "sensors"]);
581    }
582
583    #[tokio::test]
584    async fn test_list_frames() {
585        let storage = Arc::new(MemoryBackend::new());
586        let manager = RegistryManager::new(storage);
587
588        manager.create_hive("analytics").await.unwrap();
589        manager.create_box("analytics", "sensors").await.unwrap();
590
591        let schema = serde_json::json!({"fields": []});
592        manager
593            .create_frame(
594                "analytics",
595                "sensors",
596                "temperature",
597                schema.clone(),
598                vec![],
599            )
600            .await
601            .unwrap();
602        manager
603            .create_frame("analytics", "sensors", "humidity", schema, vec![])
604            .await
605            .unwrap();
606
607        let frames = manager.list_frames("analytics", "sensors").await.unwrap();
608        assert_eq!(frames, vec!["humidity", "temperature"]);
609    }
610
611    #[tokio::test]
612    async fn test_get_frame() {
613        let storage = Arc::new(MemoryBackend::new());
614        let manager = RegistryManager::new(storage);
615
616        manager.create_hive("analytics").await.unwrap();
617        manager.create_box("analytics", "sensors").await.unwrap();
618
619        let schema = serde_json::json!({
620            "fields": [{"name": "value", "type": "float"}]
621        });
622
623        manager
624            .create_frame(
625                "analytics",
626                "sensors",
627                "temperature",
628                schema.clone(),
629                vec![],
630            )
631            .await
632            .unwrap();
633
634        let frame = manager
635            .get_frame("analytics", "sensors", "temperature")
636            .await
637            .unwrap();
638
639        assert_eq!(frame.schema, schema);
640    }
641
642    #[tokio::test]
643    async fn test_extract_version() {
644        let storage = Arc::new(MemoryBackend::new());
645        let manager = RegistryManager::new(storage);
646
647        assert_eq!(
648            manager.extract_version("_registry/state_000001.json"),
649            Some(1)
650        );
651        assert_eq!(
652            manager.extract_version("_registry/state_000042.json"),
653            Some(42)
654        );
655        assert_eq!(
656            manager.extract_version("_registry/state_999999.json"),
657            Some(999999)
658        );
659        assert_eq!(manager.extract_version("invalid"), None);
660    }
661}