1use async_trait::async_trait;
29use std::collections::HashMap;
30use std::sync::Arc;
31use thiserror::Error;
32use tokio::sync::RwLock;
33
34use crate::metadata::TableMetadata;
35
36
37#[derive(Debug, Error)]
39pub enum CatalogError {
40 #[error("table not found: {0}")]
42 TableNotFound(String),
43
44 #[error("namespace not found: {0}")]
46 NamespaceNotFound(String),
47
48 #[error("table already exists: {0}")]
50 TableAlreadyExists(String),
51
52 #[error("namespace already exists: {0}")]
54 NamespaceAlreadyExists(String),
55
56 #[error("commit conflict: expected version {expected}, found {actual}")]
58 CommitConflict { expected: i64, actual: i64 },
59
60 #[error("io error: {0}")]
62 Io(#[from] std::io::Error),
63
64 #[error("serialization error: {0}")]
66 Serialization(String),
67}
68
69pub type CatalogResult<T> = Result<T, CatalogError>;
71
72#[derive(Debug, Clone, PartialEq, Eq, Hash)]
74pub struct TableIdentifier {
75 pub namespace: Vec<String>,
77
78 pub name: String,
80}
81
82impl TableIdentifier {
83 pub fn new(
85 namespace: impl IntoIterator<Item = impl Into<String>>,
86 name: impl Into<String>,
87 ) -> Self {
88 Self {
89 namespace: namespace.into_iter().map(|s| s.into()).collect(),
90 name: name.into(),
91 }
92 }
93
94 pub fn of(namespace: impl Into<String>, name: impl Into<String>) -> Self {
96 Self {
97 namespace: vec![namespace.into()],
98 name: name.into(),
99 }
100 }
101
102 pub fn full_name(&self) -> String {
104 if self.namespace.is_empty() {
105 self.name.clone()
106 } else {
107 format!("{}.{}", self.namespace.join("."), self.name)
108 }
109 }
110
111 pub fn parse(full_name: &str) -> Self {
113 let parts: Vec<&str> = full_name.split('.').collect();
114 if parts.len() == 1 {
115 Self {
116 namespace: Vec::new(),
117 name: parts[0].to_string(),
118 }
119 } else {
120 let (namespace, name) = parts.split_at(parts.len() - 1);
121 Self {
122 namespace: namespace.iter().map(|s| s.to_string()).collect(),
123 name: name[0].to_string(),
124 }
125 }
126 }
127}
128
129impl std::fmt::Display for TableIdentifier {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 write!(f, "{}", self.full_name())
132 }
133}
134
135pub type NamespaceProperties = HashMap<String, String>;
137
138#[async_trait]
140pub trait Catalog: Send + Sync {
141 fn name(&self) -> &str;
143
144 async fn list_namespaces(&self, parent: Option<&[String]>) -> CatalogResult<Vec<Vec<String>>>;
146
147 async fn create_namespace(
149 &self,
150 namespace: &[String],
151 properties: NamespaceProperties,
152 ) -> CatalogResult<()>;
153
154 async fn drop_namespace(&self, namespace: &[String]) -> CatalogResult<()>;
156
157 async fn namespace_properties(
159 &self,
160 namespace: &[String],
161 ) -> CatalogResult<NamespaceProperties>;
162
163 async fn list_tables(&self, namespace: &[String]) -> CatalogResult<Vec<TableIdentifier>>;
165
166 async fn create_table(
168 &self,
169 identifier: &TableIdentifier,
170 metadata: TableMetadata,
171 ) -> CatalogResult<TableMetadata>;
172
173 async fn load_table(&self, identifier: &TableIdentifier) -> CatalogResult<TableMetadata>;
175
176 async fn drop_table(&self, identifier: &TableIdentifier, purge: bool) -> CatalogResult<()>;
178
179 async fn rename_table(&self, from: &TableIdentifier, to: &TableIdentifier)
181 -> CatalogResult<()>;
182
183 async fn table_exists(&self, identifier: &TableIdentifier) -> CatalogResult<bool>;
185
186 async fn commit_table(
198 &self,
199 identifier: &TableIdentifier,
200 base_version: i64,
201 metadata: TableMetadata,
202 ) -> CatalogResult<TableMetadata>;
203}
204
205#[derive(Debug)]
210pub struct InMemoryCatalog {
211 name: String,
212 namespaces: RwLock<HashMap<Vec<String>, NamespaceProperties>>,
213 tables: RwLock<HashMap<TableIdentifier, TableMetadata>>,
214}
215
216impl InMemoryCatalog {
217 pub fn new(name: impl Into<String>) -> Self {
219 Self {
220 name: name.into(),
221 namespaces: RwLock::new(HashMap::new()),
222 tables: RwLock::new(HashMap::new()),
223 }
224 }
225
226 pub fn shared(name: impl Into<String>) -> Arc<Self> {
228 Arc::new(Self::new(name))
229 }
230}
231
232#[async_trait]
233impl Catalog for InMemoryCatalog {
234 fn name(&self) -> &str {
235 &self.name
236 }
237
238 async fn list_namespaces(&self, parent: Option<&[String]>) -> CatalogResult<Vec<Vec<String>>> {
239 let namespaces: tokio::sync::RwLockReadGuard<HashMap<Vec<String>, crate::catalog::NamespaceProperties>> = self.namespaces.read().await;
240 let result: Vec<Vec<String>> = namespaces
241 .keys()
242 .filter(|ns| match parent {
243 Some(p) => ns.starts_with(p) && ns.len() == p.len() + 1,
244 None => ns.len() == 1,
245 })
246 .cloned()
247 .collect();
248 Ok(result)
249 }
250
251 async fn create_namespace(
252 &self,
253 namespace: &[String],
254 properties: NamespaceProperties,
255 ) -> CatalogResult<()> {
256 let mut namespaces: tokio::sync::RwLockWriteGuard<HashMap<Vec<String>, crate::catalog::NamespaceProperties>> = self.namespaces.write().await;
257 let ns_vec = namespace.to_vec();
258
259 if namespaces.contains_key(&ns_vec) {
260 return Err(CatalogError::NamespaceAlreadyExists(namespace.join(".")));
261 }
262
263 namespaces.insert(ns_vec, properties);
264 Ok(())
265 }
266
267 async fn drop_namespace(&self, namespace: &[String]) -> CatalogResult<()> {
268 let mut namespaces = self.namespaces.write().await;
269 let ns_vec = namespace.to_vec();
270
271 if namespaces.remove(&ns_vec).is_none() {
272 return Err(CatalogError::NamespaceNotFound(namespace.join(".")));
273 }
274
275 Ok(())
276 }
277
278 async fn namespace_properties(
279 &self,
280 namespace: &[String],
281 ) -> CatalogResult<NamespaceProperties> {
282 let namespaces = self.namespaces.read().await;
283 namespaces
284 .get(namespace)
285 .cloned()
286 .ok_or_else(|| CatalogError::NamespaceNotFound(namespace.join(".")))
287 }
288
289 async fn list_tables(&self, namespace: &[String]) -> CatalogResult<Vec<TableIdentifier>> {
290 let tables = self.tables.read().await;
291 let result: Vec<TableIdentifier> = tables
292 .keys()
293 .filter(|id| id.namespace == namespace)
294 .cloned()
295 .collect();
296 Ok(result)
297 }
298
299 async fn create_table(
300 &self,
301 identifier: &TableIdentifier,
302 metadata: TableMetadata,
303 ) -> CatalogResult<TableMetadata> {
304 let mut tables = self.tables.write().await;
305
306 if tables.contains_key(identifier) {
307 return Err(CatalogError::TableAlreadyExists(identifier.full_name()));
308 }
309
310 tables.insert(identifier.clone(), metadata.clone());
311 Ok(metadata)
312 }
313
314 async fn load_table(&self, identifier: &TableIdentifier) -> CatalogResult<TableMetadata> {
315 let tables = self.tables.read().await;
316 tables
317 .get(identifier)
318 .cloned()
319 .ok_or_else(|| CatalogError::TableNotFound(identifier.full_name()))
320 }
321
322 async fn drop_table(&self, identifier: &TableIdentifier, _purge: bool) -> CatalogResult<()> {
323 let mut tables = self.tables.write().await;
324
325 if tables.remove(identifier).is_none() {
326 return Err(CatalogError::TableNotFound(identifier.full_name()));
327 }
328
329 Ok(())
330 }
331
332 async fn rename_table(
333 &self,
334 from: &TableIdentifier,
335 to: &TableIdentifier,
336 ) -> CatalogResult<()> {
337 let mut tables = self.tables.write().await;
338
339 let metadata = tables
340 .remove(from)
341 .ok_or_else(|| CatalogError::TableNotFound(from.full_name()))?;
342
343 if tables.contains_key(to) {
344 tables.insert(from.clone(), metadata);
346 return Err(CatalogError::TableAlreadyExists(to.full_name()));
347 }
348
349 tables.insert(to.clone(), metadata);
350 Ok(())
351 }
352
353 async fn table_exists(&self, identifier: &TableIdentifier) -> CatalogResult<bool> {
354 let tables: tokio::sync::RwLockReadGuard<HashMap<TableIdentifier, TableMetadata>> = self.tables.read().await;
355 Ok(tables.contains_key(identifier))
356 }
357
358 async fn commit_table(
359 &self,
360 identifier: &TableIdentifier,
361 base_version: i64,
362 metadata: TableMetadata,
363 ) -> CatalogResult<TableMetadata> {
364 let mut tables: tokio::sync::RwLockWriteGuard<HashMap<TableIdentifier, TableMetadata>> = self.tables.write().await;
365
366 let current = tables
367 .get(identifier)
368 .ok_or_else(|| CatalogError::TableNotFound(identifier.full_name()))?;
369
370 if current.last_sequence_number != base_version {
372 return Err(CatalogError::CommitConflict {
373 expected: base_version,
374 actual: current.last_sequence_number,
375 });
376 }
377
378 tables.insert(identifier.clone(), metadata.clone());
379 Ok(metadata)
380 }
381}
382
383
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388 use crate::schema::{Schema, Type};
389
390 fn sample_metadata(location: &str) -> TableMetadata {
391 let schema = Schema::builder(0)
392 .with_field(1, "id", Type::Long, true)
393 .build();
394 TableMetadata::builder(location, schema).build()
395 }
396
397 #[tokio::test]
398 async fn test_create_namespace() {
399 let catalog = InMemoryCatalog::new("test");
400
401 catalog
402 .create_namespace(&["db".into()], HashMap::new())
403 .await
404 .unwrap();
405
406 let namespaces = catalog.list_namespaces(None).await.unwrap();
407 assert_eq!(namespaces.len(), 1);
408 assert_eq!(namespaces[0], vec!["db".to_string()]);
409 }
410
411 #[tokio::test]
412 async fn test_create_table() {
413 let catalog = InMemoryCatalog::new("test");
414
415 let identifier = TableIdentifier::of("db", "users");
416 let metadata = sample_metadata("s3://bucket/users");
417
418 catalog.create_table(&identifier, metadata).await.unwrap();
419
420 let exists = catalog.table_exists(&identifier).await.unwrap();
421 assert!(exists);
422 }
423
424 #[tokio::test]
425 async fn test_commit_conflict() {
426 let catalog = InMemoryCatalog::new("test");
427
428 let identifier = TableIdentifier::of("db", "users");
429 let metadata = sample_metadata("s3://bucket/users");
430
431 catalog
432 .create_table(&identifier, metadata.clone())
433 .await
434 .unwrap();
435
436 let result = catalog
438 .commit_table(&identifier, 999, metadata.clone())
439 .await;
440
441 assert!(matches!(result, Err(CatalogError::CommitConflict { .. })));
442 }
443
444 #[tokio::test]
445 async fn test_table_identifier_parsing() {
446 let id = TableIdentifier::parse("db.schema.users");
447 assert_eq!(id.namespace, vec!["db", "schema"]);
448 assert_eq!(id.name, "users");
449 assert_eq!(id.full_name(), "db.schema.users");
450
451 let id2 = TableIdentifier::parse("simple_table");
452 assert!(id2.namespace.is_empty());
453 assert_eq!(id2.name, "simple_table");
454 }
455}