1use serde::{Deserialize, Serialize};
6
7use crate::columnar::{ColumnarProfile, DocumentMode, StrictSchema};
8use crate::kv::{KV_DEFAULT_INLINE_THRESHOLD, KvConfig, KvTtlPolicy};
9
10#[derive(
17 Debug,
18 Clone,
19 PartialEq,
20 Eq,
21 Serialize,
22 Deserialize,
23 zerompk::ToMessagePack,
24 zerompk::FromMessagePack,
25)]
26#[serde(tag = "storage")]
27pub enum CollectionType {
28 Document(DocumentMode),
31 Columnar(ColumnarProfile),
34 KeyValue(KvConfig),
38}
39
40impl Default for CollectionType {
41 fn default() -> Self {
42 Self::Document(DocumentMode::default())
43 }
44}
45
46impl CollectionType {
47 pub fn document() -> Self {
49 Self::Document(DocumentMode::Schemaless)
50 }
51
52 pub fn strict(schema: StrictSchema) -> Self {
54 Self::Document(DocumentMode::Strict(schema))
55 }
56
57 pub fn columnar() -> Self {
59 Self::Columnar(ColumnarProfile::Plain)
60 }
61
62 pub fn timeseries(time_key: impl Into<String>, interval: impl Into<String>) -> Self {
64 Self::Columnar(ColumnarProfile::Timeseries {
65 time_key: time_key.into(),
66 interval: interval.into(),
67 })
68 }
69
70 pub fn spatial(geometry_column: impl Into<String>) -> Self {
72 Self::Columnar(ColumnarProfile::Spatial {
73 geometry_column: geometry_column.into(),
74 auto_rtree: true,
75 auto_geohash: true,
76 })
77 }
78
79 pub fn kv(schema: StrictSchema) -> Self {
84 Self::KeyValue(KvConfig {
85 schema,
86 ttl: None,
87 capacity_hint: 0,
88 inline_threshold: KV_DEFAULT_INLINE_THRESHOLD,
89 })
90 }
91
92 pub fn kv_with_ttl(schema: StrictSchema, ttl: KvTtlPolicy) -> Self {
94 Self::KeyValue(KvConfig {
95 schema,
96 ttl: Some(ttl),
97 capacity_hint: 0,
98 inline_threshold: KV_DEFAULT_INLINE_THRESHOLD,
99 })
100 }
101
102 pub fn is_document(&self) -> bool {
103 matches!(self, Self::Document(_))
104 }
105
106 pub fn is_columnar_family(&self) -> bool {
109 matches!(self, Self::Columnar(_))
110 }
111
112 pub fn is_plain_columnar(&self) -> bool {
113 matches!(self, Self::Columnar(ColumnarProfile::Plain))
114 }
115
116 pub fn is_timeseries(&self) -> bool {
117 matches!(self, Self::Columnar(ColumnarProfile::Timeseries { .. }))
118 }
119
120 pub fn is_spatial(&self) -> bool {
121 matches!(self, Self::Columnar(ColumnarProfile::Spatial { .. }))
122 }
123
124 pub fn is_strict(&self) -> bool {
125 matches!(self, Self::Document(DocumentMode::Strict(_)))
126 }
127
128 pub fn is_schemaless(&self) -> bool {
129 matches!(self, Self::Document(DocumentMode::Schemaless))
130 }
131
132 pub fn is_kv(&self) -> bool {
133 matches!(self, Self::KeyValue(_))
134 }
135
136 pub fn as_str(&self) -> &'static str {
137 match self {
138 Self::Document(DocumentMode::Schemaless) => "document",
139 Self::Document(DocumentMode::Strict(_)) => "strict",
140 Self::Columnar(ColumnarProfile::Plain) => "columnar",
141 Self::Columnar(ColumnarProfile::Timeseries { .. }) => "timeseries",
142 Self::Columnar(ColumnarProfile::Spatial { .. }) => "columnar:spatial",
143 Self::KeyValue(_) => "kv",
144 }
145 }
146
147 pub fn document_mode(&self) -> Option<&DocumentMode> {
149 match self {
150 Self::Document(mode) => Some(mode),
151 _ => None,
152 }
153 }
154
155 pub fn columnar_profile(&self) -> Option<&ColumnarProfile> {
157 match self {
158 Self::Columnar(profile) => Some(profile),
159 _ => None,
160 }
161 }
162
163 pub fn kv_config(&self) -> Option<&KvConfig> {
165 match self {
166 Self::KeyValue(config) => Some(config),
167 _ => None,
168 }
169 }
170}
171
172impl std::fmt::Display for CollectionType {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 f.write_str(self.as_str())
175 }
176}
177
178impl std::str::FromStr for CollectionType {
179 type Err = String;
180
181 fn from_str(s: &str) -> Result<Self, Self::Err> {
182 match s.to_lowercase().as_str() {
183 "document" | "doc" => Ok(Self::document()),
184 "strict" => Ok(Self::Document(DocumentMode::Strict(
185 StrictSchema {
188 columns: vec![],
189 version: 1,
190 dropped_columns: Vec::new(),
191 },
192 ))),
193 "columnar" => Ok(Self::columnar()),
194 "timeseries" | "ts" => Ok(Self::timeseries("time", "1h")),
195 "kv" | "key_value" | "keyvalue" => Ok(Self::KeyValue(KvConfig {
196 schema: StrictSchema {
198 columns: vec![],
199 version: 1,
200 dropped_columns: Vec::new(),
201 },
202 ttl: None,
203 capacity_hint: 0,
204 inline_threshold: KV_DEFAULT_INLINE_THRESHOLD,
205 })),
206 other => Err(format!("unknown collection type: '{other}'")),
207 }
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use crate::columnar::{ColumnDef, ColumnType};
215
216 #[test]
217 fn default_is_schemaless_document() {
218 let ct = CollectionType::default();
219 assert!(ct.is_document());
220 assert!(ct.is_schemaless());
221 assert!(!ct.is_columnar_family());
222 assert!(!ct.is_timeseries());
223 assert!(!ct.is_kv());
224 }
225
226 #[test]
227 fn factory_methods() {
228 assert!(CollectionType::document().is_schemaless());
229 assert!(CollectionType::columnar().is_columnar_family());
230 assert!(CollectionType::timeseries("time", "1h").is_timeseries());
231 assert!(CollectionType::spatial("geom").is_columnar_family());
232 assert!(CollectionType::spatial("geom").is_spatial());
233
234 let schema = StrictSchema::new(vec![
235 ColumnDef::required("key", ColumnType::String).with_primary_key(),
236 ColumnDef::nullable("value", ColumnType::Bytes),
237 ])
238 .unwrap();
239 let kv = CollectionType::kv(schema);
240 assert!(kv.is_kv());
241 assert!(!kv.is_document());
242 assert!(!kv.is_columnar_family());
243 }
244
245 #[test]
246 fn kv_with_ttl_factory() {
247 let schema = StrictSchema::new(vec![
248 ColumnDef::required("ip", ColumnType::String).with_primary_key(),
249 ColumnDef::required("hits", ColumnType::Int64),
250 ])
251 .unwrap();
252 let ttl = KvTtlPolicy::FixedDuration {
253 duration_ms: 60_000,
254 };
255 let ct = CollectionType::kv_with_ttl(schema, ttl);
256 assert!(ct.is_kv());
257 let config = ct.kv_config().unwrap();
258 assert!(config.has_ttl());
259 match config.ttl.as_ref().unwrap() {
260 KvTtlPolicy::FixedDuration { duration_ms } => assert_eq!(*duration_ms, 60_000),
261 _ => panic!("expected FixedDuration"),
262 }
263 }
264
265 #[test]
266 fn serde_roundtrip_document() {
267 let ct = CollectionType::document();
268 let json = sonic_rs::to_string(&ct).unwrap();
269 let back: CollectionType = sonic_rs::from_str(&json).unwrap();
270 assert_eq!(back, ct);
271 }
272
273 #[test]
274 fn serde_roundtrip_columnar() {
275 let ct = CollectionType::columnar();
276 let json = sonic_rs::to_string(&ct).unwrap();
277 let back: CollectionType = sonic_rs::from_str(&json).unwrap();
278 assert_eq!(back, ct);
279 }
280
281 #[test]
282 fn serde_roundtrip_timeseries() {
283 let ct = CollectionType::timeseries("ts", "1h");
284 let json = sonic_rs::to_string(&ct).unwrap();
285 let back: CollectionType = sonic_rs::from_str(&json).unwrap();
286 assert_eq!(back, ct);
287 }
288
289 #[test]
290 fn serde_roundtrip_kv_no_ttl() {
291 let schema = StrictSchema::new(vec![
292 ColumnDef::required("k", ColumnType::String).with_primary_key(),
293 ColumnDef::nullable("v", ColumnType::Bytes),
294 ])
295 .unwrap();
296 let ct = CollectionType::kv(schema);
297 let json = sonic_rs::to_string(&ct).unwrap();
298 let back: CollectionType = sonic_rs::from_str(&json).unwrap();
299 assert_eq!(back, ct);
300 }
301
302 #[test]
303 fn serde_roundtrip_kv_fixed_ttl() {
304 let schema = StrictSchema::new(vec![
305 ColumnDef::required("k", ColumnType::String).with_primary_key(),
306 ColumnDef::required("v", ColumnType::Bytes),
307 ])
308 .unwrap();
309 let ttl = KvTtlPolicy::FixedDuration {
310 duration_ms: 900_000,
311 };
312 let ct = CollectionType::kv_with_ttl(schema, ttl);
313 let json = sonic_rs::to_string(&ct).unwrap();
314 let back: CollectionType = sonic_rs::from_str(&json).unwrap();
315 assert_eq!(back, ct);
316 }
317
318 #[test]
319 fn serde_roundtrip_kv_field_ttl() {
320 let schema = StrictSchema::new(vec![
321 ColumnDef::required("k", ColumnType::String).with_primary_key(),
322 ColumnDef::required("last_active", ColumnType::Timestamp),
323 ])
324 .unwrap();
325 let ttl = KvTtlPolicy::FieldBased {
326 field: "last_active".into(),
327 offset_ms: 3_600_000,
328 };
329 let ct = CollectionType::kv_with_ttl(schema, ttl);
330 let json = sonic_rs::to_string(&ct).unwrap();
331 let back: CollectionType = sonic_rs::from_str(&json).unwrap();
332 assert_eq!(back, ct);
333 }
334
335 #[test]
336 fn display() {
337 assert_eq!(CollectionType::document().to_string(), "document");
338 assert_eq!(CollectionType::columnar().to_string(), "columnar");
339 assert_eq!(
340 CollectionType::timeseries("time", "1h").to_string(),
341 "timeseries"
342 );
343
344 let schema = StrictSchema::new(vec![
345 ColumnDef::required("k", ColumnType::String).with_primary_key(),
346 ])
347 .unwrap();
348 assert_eq!(CollectionType::kv(schema).to_string(), "kv");
349 }
350
351 #[test]
352 fn from_str() {
353 assert!("document".parse::<CollectionType>().unwrap().is_document());
354 assert!(
355 "columnar"
356 .parse::<CollectionType>()
357 .unwrap()
358 .is_columnar_family()
359 );
360 assert!(
361 "timeseries"
362 .parse::<CollectionType>()
363 .unwrap()
364 .is_timeseries()
365 );
366 assert!("ts".parse::<CollectionType>().unwrap().is_timeseries());
367 assert!("kv".parse::<CollectionType>().unwrap().is_kv());
368 assert!("key_value".parse::<CollectionType>().unwrap().is_kv());
369 assert!("keyvalue".parse::<CollectionType>().unwrap().is_kv());
370 assert!("unknown".parse::<CollectionType>().is_err());
371 }
372
373 #[test]
374 fn accessors() {
375 let ct = CollectionType::timeseries("time", "1h");
376 assert!(ct.columnar_profile().is_some());
377 assert!(ct.document_mode().is_none());
378 assert!(ct.kv_config().is_none());
379
380 let doc = CollectionType::document();
381 assert!(doc.document_mode().is_some());
382 assert!(doc.columnar_profile().is_none());
383 assert!(doc.kv_config().is_none());
384
385 let schema = StrictSchema::new(vec![
386 ColumnDef::required("k", ColumnType::String).with_primary_key(),
387 ])
388 .unwrap();
389 let kv = CollectionType::kv(schema);
390 assert!(kv.kv_config().is_some());
391 assert!(kv.document_mode().is_none());
392 assert!(kv.columnar_profile().is_none());
393 }
394}