1use crate::{
7 error::ApiaryError,
8 registry::{Box as ApiaryBox, Frame, Hive, Registry},
9 storage::StorageBackend,
10 Result,
11};
12use bytes::Bytes;
13use std::sync::Arc;
14use tracing::{debug, info, warn};
15
16const REGISTRY_PREFIX: &str = "_registry";
18
19fn registry_state_key(version: u64) -> String {
21 format!("{}/state_{:06}.json", REGISTRY_PREFIX, version)
22}
23
24pub struct RegistryManager {
26 storage: Arc<dyn StorageBackend>,
27}
28
29impl RegistryManager {
30 pub fn new(storage: Arc<dyn StorageBackend>) -> Self {
32 Self { storage }
33 }
34
35 pub async fn load_or_create(&self) -> Result<Registry> {
37 let keys = self.storage.list(REGISTRY_PREFIX).await?;
39
40 if keys.is_empty() {
41 info!("No registry found, creating new registry");
42 return self.create_initial_registry().await;
43 }
44
45 let mut max_version = 0u64;
47 for key in &keys {
48 if let Some(version) = self.extract_version(key) {
49 max_version = max_version.max(version);
50 }
51 }
52
53 if max_version == 0 {
54 warn!("No valid registry files found, creating new registry");
55 return self.create_initial_registry().await;
56 }
57
58 info!("Loading registry version {}", max_version);
59 self.load_version(max_version).await
60 }
61
62 pub async fn load_version(&self, version: u64) -> Result<Registry> {
64 let key = registry_state_key(version);
65 let data = self.storage.get(&key).await?;
66 let registry: Registry = serde_json::from_slice(&data).map_err(|e| {
67 ApiaryError::Serialization(format!("Failed to deserialize registry: {}", e))
68 })?;
69
70 debug!(
71 "Loaded registry version {} with {} hives",
72 registry.version,
73 registry.hives.len()
74 );
75 Ok(registry)
76 }
77
78 pub async fn create_hive(&self, hive_name: &str) -> Result<Registry> {
80 let mut retry_count = 0;
81 const MAX_RETRIES: u32 = 10;
82
83 loop {
84 let mut registry = self.load_or_create().await?;
86
87 if registry.has_hive(hive_name) {
89 info!("Hive '{}' already exists", hive_name);
90 return Ok(registry);
91 }
92
93 let hive = Hive::new();
95 registry.hives.insert(hive_name.to_string(), hive);
96 registry.version = registry.next_version();
97
98 match self.try_commit_registry(®istry).await {
100 Ok(true) => {
101 info!(
102 "Created hive '{}' at registry version {}",
103 hive_name, registry.version
104 );
105 return Ok(registry);
106 }
107 Ok(false) => {
108 retry_count += 1;
110 if retry_count >= MAX_RETRIES {
111 return Err(ApiaryError::Conflict {
112 message: format!("Failed to create hive after {} retries", MAX_RETRIES),
113 });
114 }
115 warn!(
116 "Conflict creating hive '{}', retrying ({}/{})",
117 hive_name, retry_count, MAX_RETRIES
118 );
119 continue;
120 }
121 Err(e) => return Err(e),
122 }
123 }
124 }
125
126 pub async fn create_box(&self, hive_name: &str, box_name: &str) -> Result<Registry> {
128 let mut retry_count = 0;
129 const MAX_RETRIES: u32 = 10;
130
131 loop {
132 let mut registry = self.load_or_create().await?;
134
135 if !registry.has_hive(hive_name) {
137 return Err(ApiaryError::EntityNotFound {
138 entity_type: "Hive".to_string(),
139 name: hive_name.to_string(),
140 });
141 }
142
143 if registry.has_box(hive_name, box_name) {
145 info!("Box '{}.{}' already exists", hive_name, box_name);
146 return Ok(registry);
147 }
148
149 let box_ = ApiaryBox::new();
151 registry
152 .get_hive_mut(hive_name)
153 .unwrap()
154 .boxes
155 .insert(box_name.to_string(), box_);
156 registry.version = registry.next_version();
157
158 match self.try_commit_registry(®istry).await {
160 Ok(true) => {
161 info!(
162 "Created box '{}.{}' at registry version {}",
163 hive_name, box_name, registry.version
164 );
165 return Ok(registry);
166 }
167 Ok(false) => {
168 retry_count += 1;
170 if retry_count >= MAX_RETRIES {
171 return Err(ApiaryError::Conflict {
172 message: format!("Failed to create box after {} retries", MAX_RETRIES),
173 });
174 }
175 warn!(
176 "Conflict creating box '{}.{}', retrying ({}/{})",
177 hive_name, box_name, retry_count, MAX_RETRIES
178 );
179 continue;
180 }
181 Err(e) => return Err(e),
182 }
183 }
184 }
185
186 pub async fn create_frame(
188 &self,
189 hive_name: &str,
190 box_name: &str,
191 frame_name: &str,
192 schema: serde_json::Value,
193 partition_by: Vec<String>,
194 ) -> Result<Registry> {
195 let mut retry_count = 0;
196 const MAX_RETRIES: u32 = 10;
197
198 loop {
199 let mut registry = self.load_or_create().await?;
201
202 if !registry.has_hive(hive_name) {
204 return Err(ApiaryError::EntityNotFound {
205 entity_type: "Hive".to_string(),
206 name: hive_name.to_string(),
207 });
208 }
209 if !registry.has_box(hive_name, box_name) {
210 return Err(ApiaryError::EntityNotFound {
211 entity_type: "Box".to_string(),
212 name: format!("{}.{}", hive_name, box_name),
213 });
214 }
215
216 if registry.has_frame(hive_name, box_name, frame_name) {
218 info!(
219 "Frame '{}.{}.{}' already exists",
220 hive_name, box_name, frame_name
221 );
222 return Ok(registry);
223 }
224
225 let frame = if partition_by.is_empty() {
227 Frame::new(schema.clone())
228 } else {
229 Frame::with_partitioning(schema.clone(), partition_by.clone())
230 };
231
232 registry
233 .get_hive_mut(hive_name)
234 .unwrap()
235 .boxes
236 .get_mut(box_name)
237 .unwrap()
238 .frames
239 .insert(frame_name.to_string(), frame);
240 registry.version = registry.next_version();
241
242 match self.try_commit_registry(®istry).await {
244 Ok(true) => {
245 info!(
246 "Created frame '{}.{}.{}' at registry version {}",
247 hive_name, box_name, frame_name, registry.version
248 );
249 return Ok(registry);
250 }
251 Ok(false) => {
252 retry_count += 1;
254 if retry_count >= MAX_RETRIES {
255 return Err(ApiaryError::Conflict {
256 message: format!(
257 "Failed to create frame after {} retries",
258 MAX_RETRIES
259 ),
260 });
261 }
262 warn!(
263 "Conflict creating frame '{}.{}.{}', retrying ({}/{})",
264 hive_name, box_name, frame_name, retry_count, MAX_RETRIES
265 );
266 continue;
267 }
268 Err(e) => return Err(e),
269 }
270 }
271 }
272
273 pub async fn list_hives(&self) -> Result<Vec<String>> {
275 let registry = self.load_or_create().await?;
276 let mut hives: Vec<String> = registry.hives.keys().cloned().collect();
277 hives.sort();
278 Ok(hives)
279 }
280
281 pub async fn list_boxes(&self, hive_name: &str) -> Result<Vec<String>> {
283 let registry = self.load_or_create().await?;
284
285 let hive = registry
286 .get_hive(hive_name)
287 .ok_or_else(|| ApiaryError::EntityNotFound {
288 entity_type: "Hive".to_string(),
289 name: hive_name.to_string(),
290 })?;
291
292 let mut boxes: Vec<String> = hive.boxes.keys().cloned().collect();
293 boxes.sort();
294 Ok(boxes)
295 }
296
297 pub async fn list_frames(&self, hive_name: &str, box_name: &str) -> Result<Vec<String>> {
299 let registry = self.load_or_create().await?;
300
301 let hive = registry
302 .get_hive(hive_name)
303 .ok_or_else(|| ApiaryError::EntityNotFound {
304 entity_type: "Hive".to_string(),
305 name: hive_name.to_string(),
306 })?;
307
308 let box_ = hive
309 .boxes
310 .get(box_name)
311 .ok_or_else(|| ApiaryError::EntityNotFound {
312 entity_type: "Box".to_string(),
313 name: format!("{}.{}", hive_name, box_name),
314 })?;
315
316 let mut frames: Vec<String> = box_.frames.keys().cloned().collect();
317 frames.sort();
318 Ok(frames)
319 }
320
321 pub async fn get_frame(
323 &self,
324 hive_name: &str,
325 box_name: &str,
326 frame_name: &str,
327 ) -> Result<Frame> {
328 let registry = self.load_or_create().await?;
329
330 registry
331 .get_frame(hive_name, box_name, frame_name)
332 .cloned()
333 .ok_or_else(|| ApiaryError::EntityNotFound {
334 entity_type: "Frame".to_string(),
335 name: format!("{}.{}.{}", hive_name, box_name, frame_name),
336 })
337 }
338
339 async fn create_initial_registry(&self) -> Result<Registry> {
343 let registry = Registry::new();
344
345 match self.try_commit_registry(®istry).await {
346 Ok(true) => {
347 info!("Created initial registry version 1");
348 Ok(registry)
349 }
350 Ok(false) => {
351 info!("Initial registry already created by another node");
353 self.load_version(1).await
354 }
355 Err(e) => Err(e),
356 }
357 }
358
359 async fn try_commit_registry(&self, registry: &Registry) -> Result<bool> {
362 let key = registry_state_key(registry.version);
363 let json = serde_json::to_string_pretty(registry).map_err(|e| {
364 ApiaryError::Serialization(format!("Failed to serialize registry: {}", e))
365 })?;
366
367 let data = Bytes::from(json);
368 self.storage.put_if_not_exists(&key, data).await
369 }
370
371 fn extract_version(&self, key: &str) -> Option<u64> {
373 key.strip_prefix(&format!("{}/state_", REGISTRY_PREFIX))?
375 .strip_suffix(".json")?
376 .parse::<u64>()
377 .ok()
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384 use crate::storage::StorageBackend;
385 use async_trait::async_trait;
386 use std::collections::HashMap;
387 use std::sync::Mutex;
388
389 struct MemoryBackend {
391 data: Mutex<HashMap<String, Bytes>>,
392 }
393
394 impl MemoryBackend {
395 fn new() -> Self {
396 Self {
397 data: Mutex::new(HashMap::new()),
398 }
399 }
400 }
401
402 #[async_trait]
403 impl StorageBackend for MemoryBackend {
404 async fn put(&self, key: &str, data: Bytes) -> Result<()> {
405 self.data.lock().unwrap().insert(key.to_string(), data);
406 Ok(())
407 }
408
409 async fn get(&self, key: &str) -> Result<Bytes> {
410 self.data
411 .lock()
412 .unwrap()
413 .get(key)
414 .cloned()
415 .ok_or_else(|| ApiaryError::NotFound {
416 key: key.to_string(),
417 })
418 }
419
420 async fn list(&self, prefix: &str) -> Result<Vec<String>> {
421 let data = self.data.lock().unwrap();
422 Ok(data
423 .keys()
424 .filter(|k| k.starts_with(prefix))
425 .cloned()
426 .collect())
427 }
428
429 async fn delete(&self, key: &str) -> Result<()> {
430 self.data.lock().unwrap().remove(key);
431 Ok(())
432 }
433
434 async fn put_if_not_exists(&self, key: &str, data: Bytes) -> Result<bool> {
435 let mut map = self.data.lock().unwrap();
436 if map.contains_key(key) {
437 Ok(false)
438 } else {
439 map.insert(key.to_string(), data);
440 Ok(true)
441 }
442 }
443
444 async fn exists(&self, key: &str) -> Result<bool> {
445 Ok(self.data.lock().unwrap().contains_key(key))
446 }
447 }
448
449 #[tokio::test]
450 async fn test_create_initial_registry() {
451 let storage = Arc::new(MemoryBackend::new());
452 let manager = RegistryManager::new(storage);
453
454 let registry = manager.load_or_create().await.unwrap();
455 assert_eq!(registry.version, 1);
456 assert_eq!(registry.hives.len(), 0);
457 }
458
459 #[tokio::test]
460 async fn test_create_hive() {
461 let storage = Arc::new(MemoryBackend::new());
462 let manager = RegistryManager::new(storage);
463
464 let registry = manager.create_hive("analytics").await.unwrap();
465 assert_eq!(registry.version, 2);
466 assert!(registry.has_hive("analytics"));
467 }
468
469 #[tokio::test]
470 async fn test_create_hive_idempotent() {
471 let storage = Arc::new(MemoryBackend::new());
472 let manager = RegistryManager::new(storage);
473
474 let registry1 = manager.create_hive("analytics").await.unwrap();
475 let registry2 = manager.create_hive("analytics").await.unwrap();
476
477 assert_eq!(registry1.version, registry2.version);
479 }
480
481 #[tokio::test]
482 async fn test_create_box() {
483 let storage = Arc::new(MemoryBackend::new());
484 let manager = RegistryManager::new(storage);
485
486 manager.create_hive("analytics").await.unwrap();
487 let registry = manager.create_box("analytics", "sensors").await.unwrap();
488
489 assert!(registry.has_box("analytics", "sensors"));
490 }
491
492 #[tokio::test]
493 async fn test_create_box_hive_not_found() {
494 let storage = Arc::new(MemoryBackend::new());
495 let manager = RegistryManager::new(storage);
496
497 let result = manager.create_box("analytics", "sensors").await;
498 assert!(result.is_err());
499 assert!(matches!(
500 result.unwrap_err(),
501 ApiaryError::EntityNotFound { .. }
502 ));
503 }
504
505 #[tokio::test]
506 async fn test_create_frame() {
507 let storage = Arc::new(MemoryBackend::new());
508 let manager = RegistryManager::new(storage);
509
510 manager.create_hive("analytics").await.unwrap();
511 manager.create_box("analytics", "sensors").await.unwrap();
512
513 let schema = serde_json::json!({
514 "fields": [
515 {"name": "temperature", "type": "float"},
516 {"name": "timestamp", "type": "timestamp"}
517 ]
518 });
519
520 let registry = manager
521 .create_frame("analytics", "sensors", "temperature", schema, vec![])
522 .await
523 .unwrap();
524
525 assert!(registry.has_frame("analytics", "sensors", "temperature"));
526 }
527
528 #[tokio::test]
529 async fn test_create_frame_with_partitioning() {
530 let storage = Arc::new(MemoryBackend::new());
531 let manager = RegistryManager::new(storage);
532
533 manager.create_hive("analytics").await.unwrap();
534 manager.create_box("analytics", "sensors").await.unwrap();
535
536 let schema = serde_json::json!({"fields": []});
537 let partition_by = vec!["region".to_string(), "date".to_string()];
538
539 manager
540 .create_frame(
541 "analytics",
542 "sensors",
543 "temperature",
544 schema,
545 partition_by.clone(),
546 )
547 .await
548 .unwrap();
549
550 let frame = manager
551 .get_frame("analytics", "sensors", "temperature")
552 .await
553 .unwrap();
554
555 assert_eq!(frame.partition_by, partition_by);
556 }
557
558 #[tokio::test]
559 async fn test_list_hives() {
560 let storage = Arc::new(MemoryBackend::new());
561 let manager = RegistryManager::new(storage);
562
563 manager.create_hive("analytics").await.unwrap();
564 manager.create_hive("production").await.unwrap();
565
566 let hives = manager.list_hives().await.unwrap();
567 assert_eq!(hives, vec!["analytics", "production"]);
568 }
569
570 #[tokio::test]
571 async fn test_list_boxes() {
572 let storage = Arc::new(MemoryBackend::new());
573 let manager = RegistryManager::new(storage);
574
575 manager.create_hive("analytics").await.unwrap();
576 manager.create_box("analytics", "sensors").await.unwrap();
577 manager.create_box("analytics", "metrics").await.unwrap();
578
579 let boxes = manager.list_boxes("analytics").await.unwrap();
580 assert_eq!(boxes, vec!["metrics", "sensors"]);
581 }
582
583 #[tokio::test]
584 async fn test_list_frames() {
585 let storage = Arc::new(MemoryBackend::new());
586 let manager = RegistryManager::new(storage);
587
588 manager.create_hive("analytics").await.unwrap();
589 manager.create_box("analytics", "sensors").await.unwrap();
590
591 let schema = serde_json::json!({"fields": []});
592 manager
593 .create_frame(
594 "analytics",
595 "sensors",
596 "temperature",
597 schema.clone(),
598 vec![],
599 )
600 .await
601 .unwrap();
602 manager
603 .create_frame("analytics", "sensors", "humidity", schema, vec![])
604 .await
605 .unwrap();
606
607 let frames = manager.list_frames("analytics", "sensors").await.unwrap();
608 assert_eq!(frames, vec!["humidity", "temperature"]);
609 }
610
611 #[tokio::test]
612 async fn test_get_frame() {
613 let storage = Arc::new(MemoryBackend::new());
614 let manager = RegistryManager::new(storage);
615
616 manager.create_hive("analytics").await.unwrap();
617 manager.create_box("analytics", "sensors").await.unwrap();
618
619 let schema = serde_json::json!({
620 "fields": [{"name": "value", "type": "float"}]
621 });
622
623 manager
624 .create_frame(
625 "analytics",
626 "sensors",
627 "temperature",
628 schema.clone(),
629 vec![],
630 )
631 .await
632 .unwrap();
633
634 let frame = manager
635 .get_frame("analytics", "sensors", "temperature")
636 .await
637 .unwrap();
638
639 assert_eq!(frame.schema, schema);
640 }
641
642 #[tokio::test]
643 async fn test_extract_version() {
644 let storage = Arc::new(MemoryBackend::new());
645 let manager = RegistryManager::new(storage);
646
647 assert_eq!(
648 manager.extract_version("_registry/state_000001.json"),
649 Some(1)
650 );
651 assert_eq!(
652 manager.extract_version("_registry/state_000042.json"),
653 Some(42)
654 );
655 assert_eq!(
656 manager.extract_version("_registry/state_999999.json"),
657 Some(999999)
658 );
659 assert_eq!(manager.extract_version("invalid"), None);
660 }
661}