1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
/// In-memory cache for OCI-backed CloudEvent schemas.
#[derive(Clone)]
pub struct SchemaCache {
schemas: Arc<RwLock<HashMap<String, Value>>>,
}
impl Default for SchemaCache {
fn default() -> Self {
Self::new()
}
}
impl SchemaCache {
/// Create a new empty schema cache.
pub fn new() -> Self {
Self {
schemas: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Get a schema by its ID or Version.
pub async fn get(&self, schema_id: &str) -> Option<Value> {
let cache = self.schemas.read().await;
cache.get(schema_id).cloned()
}
/// Insert or update a schema in the cache.
pub async fn insert(&self, schema_id: String, schema: Value) {
let mut cache = self.schemas.write().await;
cache.insert(schema_id, schema);
}
/// Start a background sync process to fetch schemas from an OCI registry.
/// This is a permissive sync; it runs asynchronously and updates the cache.
pub fn start_background_sync(&self, _oci_registry_url: String) {
let _schemas = self.schemas.clone();
tokio::spawn(async move {
// Implementation for syncing from `oci_registry_url` goes here.
// For now, this is a stub that represents the background sync loop.
loop {
// TODO: Implement OCI fetch
// let fetched_schemas = fetch_schemas_from_oci(&oci_registry_url).await;
// for (id, schema) in fetched_schemas {
// schemas.write().await.insert(id, schema);
// }
tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
}
});
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_schema_cache_insert_and_get() {
let cache = SchemaCache::default();
let schema_id = "test_schema_id".to_string();
let schema_val = serde_json::json!({"type": "object"});
// Should be empty initially
assert_eq!(cache.get(&schema_id).await, None);
// Insert
cache.insert(schema_id.clone(), schema_val.clone()).await;
// Should return the inserted value
assert_eq!(cache.get(&schema_id).await, Some(schema_val));
}
#[tokio::test]
async fn test_schema_cache_start_background_sync() {
let cache = SchemaCache::new();
// Since it's a mock implementation, we just verify it runs without panicking.
cache.start_background_sync("dummy_url".to_string());
}
}