1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use sha2::{Digest, Sha256};
4
5use crate::dataset::error::DatasetError;
6
7fn validate_namespace_component(name: &str, label: &str) -> Result<(), DatasetError> {
8 if name.is_empty() {
9 return Err(DatasetError::SchemaParseError(format!(
10 "{label} must not be empty"
11 )));
12 }
13 if name.contains('/') || name.contains('.') || name.contains('"') {
14 return Err(DatasetError::SchemaParseError(format!(
15 "{label} must not contain '/', '.', or '\"'"
16 )));
17 }
18 Ok(())
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
22pub struct DatasetNamespace {
23 pub catalog: String,
24 pub schema_name: String,
25 pub table: String,
26}
27
28impl DatasetNamespace {
29 pub fn new(
30 catalog: impl Into<String>,
31 schema_name: impl Into<String>,
32 table: impl Into<String>,
33 ) -> Result<Self, DatasetError> {
34 let catalog = catalog.into();
35 let schema_name = schema_name.into();
36 let table = table.into();
37 validate_namespace_component(&catalog, "catalog")?;
38 validate_namespace_component(&schema_name, "schema_name")?;
39 validate_namespace_component(&table, "table")?;
40 Ok(Self {
41 catalog,
42 schema_name,
43 table,
44 })
45 }
46
47 pub fn fqn(&self) -> String {
48 format!("{}.{}.{}", self.catalog, self.schema_name, self.table)
49 }
50
51 pub fn quoted_fqn(&self) -> String {
52 format!(
53 "\"{}\".\"{}\".\"{}\"",
54 self.catalog, self.schema_name, self.table
55 )
56 }
57
58 pub fn storage_path(&self) -> String {
59 format!(
60 "datasets/{}/{}/{}",
61 self.catalog, self.schema_name, self.table
62 )
63 }
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
67pub struct DatasetFingerprint(pub String);
68
69impl DatasetFingerprint {
70 pub fn from_schema_json(arrow_schema_json: &str) -> Self {
73 let mut hasher = Sha256::new();
74 hasher.update(arrow_schema_json.as_bytes());
75 let hash = hasher.finalize();
76 let hex = hex::encode(hash);
77 DatasetFingerprint(hex[..32].to_string())
78 }
79
80 pub fn as_str(&self) -> &str {
81 &self.0
82 }
83}
84
85impl std::fmt::Display for DatasetFingerprint {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 write!(f, "{}", self.0)
88 }
89}
90
91#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
92#[serde(rename_all = "lowercase")]
93pub enum DatasetStatus {
94 Active,
95 Deprecated,
96}
97
98impl std::fmt::Display for DatasetStatus {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 match self {
101 DatasetStatus::Active => write!(f, "active"),
102 DatasetStatus::Deprecated => write!(f, "deprecated"),
103 }
104 }
105}
106
107impl std::str::FromStr for DatasetStatus {
108 type Err = DatasetError;
109
110 fn from_str(s: &str) -> Result<Self, Self::Err> {
111 match s {
112 "active" => Ok(DatasetStatus::Active),
113 "deprecated" => Ok(DatasetStatus::Deprecated),
114 other => Err(DatasetError::SchemaParseError(format!(
115 "Unknown dataset status: '{}'",
116 other
117 ))),
118 }
119 }
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct DatasetRegistration {
124 pub namespace: DatasetNamespace,
125 pub fingerprint: DatasetFingerprint,
126 pub arrow_schema_json: String,
128 pub json_schema: String,
130 pub partition_columns: Vec<String>,
132 pub created_at: DateTime<Utc>,
133 pub updated_at: DateTime<Utc>,
134 pub status: DatasetStatus,
135}
136
137impl DatasetRegistration {
138 pub fn new(
139 namespace: DatasetNamespace,
140 fingerprint: DatasetFingerprint,
141 arrow_schema_json: String,
142 json_schema: String,
143 partition_columns: Vec<String>,
144 ) -> Self {
145 let now = Utc::now();
146 Self {
147 namespace,
148 fingerprint,
149 arrow_schema_json,
150 json_schema,
151 partition_columns,
152 created_at: now,
153 updated_at: now,
154 status: DatasetStatus::Active,
155 }
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162
163 fn make_ns() -> DatasetNamespace {
164 DatasetNamespace::new("cat", "sch", "tbl").unwrap()
165 }
166
167 #[test]
168 fn test_fqn() {
169 assert_eq!(make_ns().fqn(), "cat.sch.tbl");
170 }
171
172 #[test]
173 fn test_storage_path() {
174 assert_eq!(make_ns().storage_path(), "datasets/cat/sch/tbl");
175 }
176
177 #[test]
178 fn test_namespace_rejects_path_traversal() {
179 assert!(DatasetNamespace::new("../../etc", "sch", "tbl").is_err());
180 assert!(DatasetNamespace::new("cat", "../etc", "tbl").is_err());
181 assert!(DatasetNamespace::new("cat", "sch", "../../etc").is_err());
182 }
183
184 #[test]
185 fn test_namespace_rejects_slash() {
186 assert!(DatasetNamespace::new("a/b", "sch", "tbl").is_err());
187 }
188
189 #[test]
190 fn test_namespace_rejects_double_quote() {
191 assert!(DatasetNamespace::new("a\"b", "sch", "tbl").is_err());
192 assert!(DatasetNamespace::new("cat", "s\"ch", "tbl").is_err());
193 assert!(DatasetNamespace::new("cat", "sch", "tb\"l").is_err());
194 }
195
196 #[test]
197 fn test_quoted_fqn() {
198 assert_eq!(make_ns().quoted_fqn(), r#""cat"."sch"."tbl""#);
199 let ns = DatasetNamespace::new("my-catalog", "my-schema", "my-table").unwrap();
200 assert_eq!(ns.quoted_fqn(), r#""my-catalog"."my-schema"."my-table""#);
201 }
202
203 #[test]
204 fn test_namespace_rejects_empty() {
205 assert!(DatasetNamespace::new("", "sch", "tbl").is_err());
206 assert!(DatasetNamespace::new("cat", "", "tbl").is_err());
207 assert!(DatasetNamespace::new("cat", "sch", "").is_err());
208 }
209
210 #[test]
211 fn test_fingerprint_is_32_chars() {
212 let fp = DatasetFingerprint::from_schema_json("test");
213 assert_eq!(fp.as_str().len(), 32);
214 }
215
216 #[test]
217 fn test_fingerprint_stability() {
218 let fp1 = DatasetFingerprint::from_schema_json("test");
219 let fp2 = DatasetFingerprint::from_schema_json("test");
220 assert_eq!(fp1, fp2);
221 }
222
223 #[test]
224 fn test_dataset_status_display() {
225 assert_eq!(DatasetStatus::Active.to_string(), "active");
226 assert_eq!(DatasetStatus::Deprecated.to_string(), "deprecated");
227 }
228
229 #[test]
230 fn test_registration_defaults() {
231 let ns = make_ns();
232 let fp = DatasetFingerprint::from_schema_json("s");
233 let reg = DatasetRegistration::new(ns, fp, "{}".into(), "{}".into(), vec![]);
234 assert_eq!(reg.status, DatasetStatus::Active);
235 assert_eq!(reg.created_at, reg.updated_at);
236 }
237}