Skip to main content

scouter_types/dataset/
types.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use sha2::{Digest, Sha256};
4
5use crate::dataset::error::DatasetError;
6
7fn validate_namespace_component(name: &str, label: &str) -> Result<(), DatasetError> {
8    if name.is_empty() {
9        return Err(DatasetError::SchemaParseError(format!(
10            "{label} must not be empty"
11        )));
12    }
13    if name.contains('/') || name.contains('.') || name.contains('"') {
14        return Err(DatasetError::SchemaParseError(format!(
15            "{label} must not contain '/', '.', or '\"'"
16        )));
17    }
18    Ok(())
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
22pub struct DatasetNamespace {
23    pub catalog: String,
24    pub schema_name: String,
25    pub table: String,
26}
27
28impl DatasetNamespace {
29    pub fn new(
30        catalog: impl Into<String>,
31        schema_name: impl Into<String>,
32        table: impl Into<String>,
33    ) -> Result<Self, DatasetError> {
34        let catalog = catalog.into();
35        let schema_name = schema_name.into();
36        let table = table.into();
37        validate_namespace_component(&catalog, "catalog")?;
38        validate_namespace_component(&schema_name, "schema_name")?;
39        validate_namespace_component(&table, "table")?;
40        Ok(Self {
41            catalog,
42            schema_name,
43            table,
44        })
45    }
46
47    pub fn fqn(&self) -> String {
48        format!("{}.{}.{}", self.catalog, self.schema_name, self.table)
49    }
50
51    pub fn quoted_fqn(&self) -> String {
52        format!(
53            "\"{}\".\"{}\".\"{}\"",
54            self.catalog, self.schema_name, self.table
55        )
56    }
57
58    pub fn storage_path(&self) -> String {
59        format!(
60            "datasets/{}/{}/{}",
61            self.catalog, self.schema_name, self.table
62        )
63    }
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
67pub struct DatasetFingerprint(pub String);
68
69impl DatasetFingerprint {
70    /// Compute a stable fingerprint from the canonical Arrow schema JSON.
71    /// Uses SHA-256, truncated to 32 hex chars (128 bits) for compactness.
72    pub fn from_schema_json(arrow_schema_json: &str) -> Self {
73        let mut hasher = Sha256::new();
74        hasher.update(arrow_schema_json.as_bytes());
75        let hash = hasher.finalize();
76        let hex = hex::encode(hash);
77        DatasetFingerprint(hex[..32].to_string())
78    }
79
80    pub fn as_str(&self) -> &str {
81        &self.0
82    }
83}
84
85impl std::fmt::Display for DatasetFingerprint {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        write!(f, "{}", self.0)
88    }
89}
90
91#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
92#[serde(rename_all = "lowercase")]
93pub enum DatasetStatus {
94    Active,
95    Deprecated,
96}
97
98impl std::fmt::Display for DatasetStatus {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        match self {
101            DatasetStatus::Active => write!(f, "active"),
102            DatasetStatus::Deprecated => write!(f, "deprecated"),
103        }
104    }
105}
106
107impl std::str::FromStr for DatasetStatus {
108    type Err = DatasetError;
109
110    fn from_str(s: &str) -> Result<Self, Self::Err> {
111        match s {
112            "active" => Ok(DatasetStatus::Active),
113            "deprecated" => Ok(DatasetStatus::Deprecated),
114            other => Err(DatasetError::SchemaParseError(format!(
115                "Unknown dataset status: '{}'",
116                other
117            ))),
118        }
119    }
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct DatasetRegistration {
124    pub namespace: DatasetNamespace,
125    pub fingerprint: DatasetFingerprint,
126    /// Arrow schema serialized to JSON (IPC schema format)
127    pub arrow_schema_json: String,
128    /// Original Pydantic JSON Schema for client-side reconstruction
129    pub json_schema: String,
130    /// User-specified partition columns beyond the default `scouter_partition_date`
131    pub partition_columns: Vec<String>,
132    pub created_at: DateTime<Utc>,
133    pub updated_at: DateTime<Utc>,
134    pub status: DatasetStatus,
135}
136
137impl DatasetRegistration {
138    pub fn new(
139        namespace: DatasetNamespace,
140        fingerprint: DatasetFingerprint,
141        arrow_schema_json: String,
142        json_schema: String,
143        partition_columns: Vec<String>,
144    ) -> Self {
145        let now = Utc::now();
146        Self {
147            namespace,
148            fingerprint,
149            arrow_schema_json,
150            json_schema,
151            partition_columns,
152            created_at: now,
153            updated_at: now,
154            status: DatasetStatus::Active,
155        }
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162
163    fn make_ns() -> DatasetNamespace {
164        DatasetNamespace::new("cat", "sch", "tbl").unwrap()
165    }
166
167    #[test]
168    fn test_fqn() {
169        assert_eq!(make_ns().fqn(), "cat.sch.tbl");
170    }
171
172    #[test]
173    fn test_storage_path() {
174        assert_eq!(make_ns().storage_path(), "datasets/cat/sch/tbl");
175    }
176
177    #[test]
178    fn test_namespace_rejects_path_traversal() {
179        assert!(DatasetNamespace::new("../../etc", "sch", "tbl").is_err());
180        assert!(DatasetNamespace::new("cat", "../etc", "tbl").is_err());
181        assert!(DatasetNamespace::new("cat", "sch", "../../etc").is_err());
182    }
183
184    #[test]
185    fn test_namespace_rejects_slash() {
186        assert!(DatasetNamespace::new("a/b", "sch", "tbl").is_err());
187    }
188
189    #[test]
190    fn test_namespace_rejects_double_quote() {
191        assert!(DatasetNamespace::new("a\"b", "sch", "tbl").is_err());
192        assert!(DatasetNamespace::new("cat", "s\"ch", "tbl").is_err());
193        assert!(DatasetNamespace::new("cat", "sch", "tb\"l").is_err());
194    }
195
196    #[test]
197    fn test_quoted_fqn() {
198        assert_eq!(make_ns().quoted_fqn(), r#""cat"."sch"."tbl""#);
199        let ns = DatasetNamespace::new("my-catalog", "my-schema", "my-table").unwrap();
200        assert_eq!(ns.quoted_fqn(), r#""my-catalog"."my-schema"."my-table""#);
201    }
202
203    #[test]
204    fn test_namespace_rejects_empty() {
205        assert!(DatasetNamespace::new("", "sch", "tbl").is_err());
206        assert!(DatasetNamespace::new("cat", "", "tbl").is_err());
207        assert!(DatasetNamespace::new("cat", "sch", "").is_err());
208    }
209
210    #[test]
211    fn test_fingerprint_is_32_chars() {
212        let fp = DatasetFingerprint::from_schema_json("test");
213        assert_eq!(fp.as_str().len(), 32);
214    }
215
216    #[test]
217    fn test_fingerprint_stability() {
218        let fp1 = DatasetFingerprint::from_schema_json("test");
219        let fp2 = DatasetFingerprint::from_schema_json("test");
220        assert_eq!(fp1, fp2);
221    }
222
223    #[test]
224    fn test_dataset_status_display() {
225        assert_eq!(DatasetStatus::Active.to_string(), "active");
226        assert_eq!(DatasetStatus::Deprecated.to_string(), "deprecated");
227    }
228
229    #[test]
230    fn test_registration_defaults() {
231        let ns = make_ns();
232        let fp = DatasetFingerprint::from_schema_json("s");
233        let reg = DatasetRegistration::new(ns, fp, "{}".into(), "{}".into(), vec![]);
234        assert_eq!(reg.status, DatasetStatus::Active);
235        assert_eq!(reg.created_at, reg.updated_at);
236    }
237}