Skip to main content

fluss/metadata/
database.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::error::Error::JsonSerdeError;
19use crate::error::Result;
20use crate::metadata::JsonSerde;
21use serde::{Deserialize, Serialize};
22use serde_json::{Value, json};
23use std::collections::HashMap;
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
26pub struct DatabaseDescriptor {
27    comment: Option<String>,
28    custom_properties: HashMap<String, String>,
29}
30
31#[derive(Debug, Clone)]
32pub struct DatabaseInfo {
33    database_name: String,
34    database_descriptor: DatabaseDescriptor,
35    created_time: i64,
36    modified_time: i64,
37}
38
39impl DatabaseInfo {
40    pub fn new(
41        database_name: String,
42        database_descriptor: DatabaseDescriptor,
43        created_time: i64,
44        modified_time: i64,
45    ) -> Self {
46        Self {
47            database_name,
48            database_descriptor,
49            created_time,
50            modified_time,
51        }
52    }
53
54    pub fn database_name(&self) -> &str {
55        &self.database_name
56    }
57
58    pub fn database_descriptor(&self) -> &DatabaseDescriptor {
59        &self.database_descriptor
60    }
61
62    pub fn created_time(&self) -> i64 {
63        self.created_time
64    }
65
66    pub fn modified_time(&self) -> i64 {
67        self.modified_time
68    }
69}
70
71#[derive(Debug, Default)]
72pub struct DatabaseDescriptorBuilder {
73    comment: Option<String>,
74    custom_properties: HashMap<String, String>,
75}
76
77impl DatabaseDescriptor {
78    pub fn builder() -> DatabaseDescriptorBuilder {
79        DatabaseDescriptorBuilder::default()
80    }
81
82    pub fn comment(&self) -> Option<&str> {
83        self.comment.as_deref()
84    }
85
86    pub fn custom_properties(&self) -> &HashMap<String, String> {
87        &self.custom_properties
88    }
89}
90
91impl DatabaseDescriptorBuilder {
92    pub fn comment<C: Into<String>>(mut self, comment: C) -> Self {
93        self.comment = Some(comment.into());
94        self
95    }
96
97    pub fn custom_properties<K: Into<String>, V: Into<String>>(
98        mut self,
99        properties: HashMap<K, V>,
100    ) -> Self {
101        for (k, v) in properties {
102            self.custom_properties.insert(k.into(), v.into());
103        }
104        self
105    }
106
107    pub fn custom_property<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
108        self.custom_properties.insert(key.into(), value.into());
109        self
110    }
111
112    pub fn build(self) -> DatabaseDescriptor {
113        DatabaseDescriptor {
114            comment: self.comment,
115            custom_properties: self.custom_properties,
116        }
117    }
118}
119
120impl DatabaseDescriptor {
121    const CUSTOM_PROPERTIES_NAME: &'static str = "custom_properties";
122    const COMMENT_NAME: &'static str = "comment";
123    const VERSION_KEY: &'static str = "version";
124    const VERSION: u32 = 1;
125}
126
127impl JsonSerde for DatabaseDescriptor {
128    fn serialize_json(&self) -> Result<Value> {
129        let mut obj = serde_json::Map::new();
130
131        // Serialize version
132        obj.insert(Self::VERSION_KEY.to_string(), json!(Self::VERSION));
133
134        // Serialize comment if present
135        if let Some(comment) = self.comment() {
136            obj.insert(Self::COMMENT_NAME.to_string(), json!(comment));
137        }
138
139        // Serialize custom properties
140        obj.insert(
141            Self::CUSTOM_PROPERTIES_NAME.to_string(),
142            json!(self.custom_properties()),
143        );
144
145        Ok(Value::Object(obj))
146    }
147
148    fn deserialize_json(node: &Value) -> Result<Self> {
149        let mut builder = DatabaseDescriptor::builder();
150
151        // Deserialize comment if present
152        if let Some(comment_node) = node.get(Self::COMMENT_NAME) {
153            let comment = comment_node
154                .as_str()
155                .ok_or_else(|| JsonSerdeError {
156                    message: format!("{} should be a string", Self::COMMENT_NAME),
157                })?
158                .to_owned();
159            builder = builder.comment(&comment);
160        }
161
162        // Deserialize custom properties directly
163        let custom_properties = if let Some(props_node) = node.get(Self::CUSTOM_PROPERTIES_NAME) {
164            let obj = props_node.as_object().ok_or_else(|| JsonSerdeError {
165                message: "Custom properties should be an object".to_string(),
166            })?;
167
168            let mut properties = HashMap::with_capacity(obj.len());
169            for (key, value) in obj {
170                properties.insert(
171                    key.clone(),
172                    value
173                        .as_str()
174                        .ok_or_else(|| JsonSerdeError {
175                            message: "Property value should be a string".to_string(),
176                        })?
177                        .to_owned(),
178                );
179            }
180            properties
181        } else {
182            HashMap::new()
183        };
184        builder = builder.custom_properties(custom_properties);
185
186        Ok(builder.build())
187    }
188}
189
190impl DatabaseDescriptor {
191    /// Create DatabaseDescriptor from JSON bytes (equivalent to Java's fromJsonBytes)
192    pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
193        let json_value: Value = serde_json::from_slice(bytes).map_err(|e| JsonSerdeError {
194            message: format!("Failed to parse JSON: {e}"),
195        })?;
196        Self::deserialize_json(&json_value)
197    }
198
199    /// Convert DatabaseDescriptor to JSON bytes
200    pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
201        let json_value = self.serialize_json()?;
202        serde_json::to_vec(&json_value).map_err(|e| JsonSerdeError {
203            message: format!("Failed to serialize to JSON: {e}"),
204        })
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    #[test]
213    fn test_database_descriptor_json_serde() {
214        let mut custom_props = HashMap::new();
215        custom_props.insert("key1".to_string(), "value1".to_string());
216        custom_props.insert("key2".to_string(), "value2".to_string());
217
218        let descriptor = DatabaseDescriptor::builder()
219            .comment("Test database")
220            .custom_properties(custom_props)
221            .build();
222
223        // Test serialization
224        let json_bytes = descriptor.to_json_bytes().unwrap();
225        println!("Serialized JSON: {}", String::from_utf8_lossy(&json_bytes));
226
227        // Test deserialization
228        let deserialized = DatabaseDescriptor::from_json_bytes(&json_bytes).unwrap();
229        assert_eq!(descriptor, deserialized);
230    }
231
232    #[test]
233    fn test_empty_database_descriptor() {
234        let descriptor = DatabaseDescriptor::builder().build();
235        let json_bytes = descriptor.to_json_bytes().unwrap();
236        let deserialized = DatabaseDescriptor::from_json_bytes(&json_bytes).unwrap();
237        assert_eq!(descriptor, deserialized);
238    }
239}