1use crate::error::{Error, Result};
19use crate::proto::{PbKeyValue, PbPartitionInfo, PbPartitionSpec};
20use crate::{PartitionId, TableId};
21use std::collections::HashMap;
22use std::fmt::{Display, Formatter};
23use std::sync::Arc;
24
25#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct PartitionSpec {
30 partition_spec: HashMap<String, String>,
31}
32
33impl PartitionSpec {
34 pub fn new<K: Into<String>, V: Into<String>>(partition_spec: HashMap<K, V>) -> Self {
35 let mut new_map = HashMap::new();
36 for (k, v) in partition_spec {
37 new_map.insert(k.into(), v.into());
38 }
39 Self {
40 partition_spec: new_map,
41 }
42 }
43
44 pub fn get_spec_map(&self) -> &HashMap<String, String> {
45 &self.partition_spec
46 }
47
48 pub fn to_pb(&self) -> PbPartitionSpec {
49 PbPartitionSpec {
50 partition_key_values: self
51 .partition_spec
52 .iter()
53 .map(|(k, v)| PbKeyValue {
54 key: k.clone(),
55 value: v.clone(),
56 })
57 .collect(),
58 }
59 }
60
61 pub fn from_pb(pb: &PbPartitionSpec) -> Self {
62 let partition_spec = pb
63 .partition_key_values
64 .iter()
65 .map(|kv| (kv.key.clone(), kv.value.clone()))
66 .collect();
67 Self { partition_spec }
68 }
69}
70
71impl Display for PartitionSpec {
72 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73 write!(f, "PartitionSpec{{{:?}}}", self.partition_spec)
74 }
75}
76
77#[derive(Debug, Clone, PartialEq, Eq, Hash)]
81pub struct ResolvedPartitionSpec {
82 partition_keys: Arc<[String]>,
83 partition_values: Vec<String>,
84}
85
86pub const PARTITION_SPEC_SEPARATOR: &str = "$";
87
88impl ResolvedPartitionSpec {
89 pub fn new(partition_keys: Arc<[String]>, partition_values: Vec<String>) -> Result<Self> {
90 if partition_keys.len() != partition_values.len() {
91 return Err(Error::IllegalArgument {
92 message: "The number of partition keys and partition values should be the same."
93 .to_string(),
94 });
95 }
96
97 Ok(Self {
98 partition_keys,
99 partition_values,
100 })
101 }
102
103 pub fn from_partition_spec(
104 partition_keys: Arc<[String]>,
105 partition_spec: &PartitionSpec,
106 ) -> Self {
107 let partition_values =
108 Self::get_reordered_partition_values(&partition_keys, partition_spec);
109 Self {
110 partition_keys,
111 partition_values,
112 }
113 }
114
115 pub fn from_partition_name(partition_keys: Arc<[String]>, partition_name: &str) -> Self {
116 let partition_values: Vec<String> = partition_name
117 .split(PARTITION_SPEC_SEPARATOR)
118 .map(|s| s.to_string())
119 .collect();
120 Self {
121 partition_keys,
122 partition_values,
123 }
124 }
125
126 pub fn from_partition_qualified_name(qualified_partition_name: &str) -> Result<Self> {
127 let mut keys = Vec::new();
128 let mut values = Vec::new();
129
130 for pair in qualified_partition_name.split('/') {
131 let parts: Vec<&str> = pair.splitn(2, '=').collect();
132 if parts.len() != 2 {
133 return Err(Error::IllegalArgument {
134 message: format!(
135 "Invalid partition name format. Expected key=value, got: {pair}"
136 ),
137 });
138 }
139 keys.push(parts[0].to_string());
140 values.push(parts[1].to_string());
141 }
142
143 Ok(Self {
144 partition_keys: Arc::from(keys),
145 partition_values: values,
146 })
147 }
148
149 pub fn get_partition_keys(&self) -> &[String] {
150 &self.partition_keys
151 }
152
153 pub fn get_partition_values(&self) -> &[String] {
154 &self.partition_values
155 }
156
157 pub fn to_partition_spec(&self) -> PartitionSpec {
158 let mut spec_map = HashMap::new();
159 for (i, key) in self.partition_keys.iter().enumerate() {
160 spec_map.insert(key.clone(), self.partition_values[i].clone());
161 }
162 PartitionSpec::new(spec_map)
163 }
164
165 pub fn get_partition_name(&self) -> String {
169 self.partition_values.join(PARTITION_SPEC_SEPARATOR)
170 }
171
172 pub fn get_partition_qualified_name(&self) -> String {
175 let mut sb = String::new();
176 for (i, key) in self.partition_keys.iter().enumerate() {
177 sb.push_str(key);
178 sb.push('=');
179 sb.push_str(&self.partition_values[i]);
180 if i != self.partition_keys.len() - 1 {
181 sb.push('/');
182 }
183 }
184 sb
185 }
186
187 pub fn contains(&self, other: &ResolvedPartitionSpec) -> Result<bool> {
188 let other_partition_keys = other.get_partition_keys();
189 let other_partition_values = other.get_partition_values();
190
191 let mut expected_partition_values = Vec::new();
192 for other_partition_key in other_partition_keys {
193 let key_index = self
194 .partition_keys
195 .iter()
196 .position(|k| k == other_partition_key);
197 match key_index {
198 Some(idx) => expected_partition_values.push(self.partition_values[idx].clone()),
199 None => {
200 return Err(Error::IllegalArgument {
201 message: format!(
202 "table does not contain partitionKey: {other_partition_key}"
203 ),
204 });
205 }
206 }
207 }
208
209 let expected_partition_name = expected_partition_values.join(PARTITION_SPEC_SEPARATOR);
210 let other_partition_name = other_partition_values.join(PARTITION_SPEC_SEPARATOR);
211
212 Ok(expected_partition_name == other_partition_name)
213 }
214
215 pub fn to_pb(&self) -> PbPartitionSpec {
216 PbPartitionSpec {
217 partition_key_values: self
218 .partition_keys
219 .iter()
220 .zip(self.partition_values.iter())
221 .map(|(k, v)| PbKeyValue {
222 key: k.clone(),
223 value: v.clone(),
224 })
225 .collect(),
226 }
227 }
228
229 pub fn from_pb(pb: &PbPartitionSpec) -> Self {
230 let partition_keys = pb
231 .partition_key_values
232 .iter()
233 .map(|kv| kv.key.clone())
234 .collect();
235 let partition_values = pb
236 .partition_key_values
237 .iter()
238 .map(|kv| kv.value.clone())
239 .collect();
240
241 Self {
242 partition_keys,
243 partition_values,
244 }
245 }
246
247 fn get_reordered_partition_values(
248 partition_keys: &Arc<[String]>,
249 partition_spec: &PartitionSpec,
250 ) -> Vec<String> {
251 let partition_spec_map = partition_spec.get_spec_map();
252 partition_keys
253 .iter()
254 .map(|key| partition_spec_map.get(key).cloned().unwrap_or_default())
255 .collect()
256 }
257}
258
259impl Display for ResolvedPartitionSpec {
260 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
261 write!(f, "{}", self.get_partition_qualified_name())
262 }
263}
264
265#[derive(Debug, Clone, PartialEq, Eq, Hash)]
268pub struct PartitionInfo {
269 partition_id: PartitionId,
270 partition_spec: ResolvedPartitionSpec,
271}
272
273impl PartitionInfo {
274 pub fn new(partition_id: PartitionId, partition_spec: ResolvedPartitionSpec) -> Self {
275 Self {
276 partition_id,
277 partition_spec,
278 }
279 }
280
281 pub fn get_partition_id(&self) -> PartitionId {
283 self.partition_id
284 }
285
286 pub fn get_partition_name(&self) -> String {
288 self.partition_spec.get_partition_name()
289 }
290
291 pub fn get_resolved_partition_spec(&self) -> &ResolvedPartitionSpec {
292 &self.partition_spec
293 }
294
295 pub fn get_partition_spec(&self) -> PartitionSpec {
296 self.partition_spec.to_partition_spec()
297 }
298
299 pub fn to_pb(&self) -> PbPartitionInfo {
300 PbPartitionInfo {
301 partition_id: self.partition_id,
302 partition_spec: self.partition_spec.to_pb(),
303 }
304 }
305
306 pub fn from_pb(pb: &PbPartitionInfo) -> Self {
307 Self {
308 partition_id: pb.partition_id,
309 partition_spec: ResolvedPartitionSpec::from_pb(&pb.partition_spec),
310 }
311 }
312}
313
314impl Display for PartitionInfo {
315 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
316 write!(
317 f,
318 "Partition{{name='{}', id={}}}",
319 self.get_partition_name(),
320 self.partition_id
321 )
322 }
323}
324
325#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
327pub struct TablePartition {
328 table_id: TableId,
329 partition_id: PartitionId,
330}
331
332impl TablePartition {
333 pub fn new(table_id: TableId, partition_id: PartitionId) -> Self {
334 Self {
335 table_id,
336 partition_id,
337 }
338 }
339
340 pub fn get_table_id(&self) -> i64 {
341 self.table_id
342 }
343
344 pub fn get_partition_id(&self) -> PartitionId {
345 self.partition_id
346 }
347}
348
349impl Display for TablePartition {
350 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
351 write!(
352 f,
353 "TablePartition{{tableId={}, partitionId={}}}",
354 self.table_id, self.partition_id
355 )
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use super::*;
362
363 #[test]
364 fn test_resolved_partition_spec_name() {
365 let spec = ResolvedPartitionSpec::new(
366 Arc::from(["date".to_string(), "region".to_string()]),
367 vec!["2024-01-15".to_string(), "US".to_string()],
368 )
369 .unwrap();
370
371 assert_eq!(spec.get_partition_name(), "2024-01-15$US");
372 assert_eq!(
373 spec.get_partition_qualified_name(),
374 "date=2024-01-15/region=US"
375 );
376 }
377
378 #[test]
379 fn test_resolved_partition_spec_from_partition_name() {
380 let spec = ResolvedPartitionSpec::from_partition_name(
381 Arc::from(["date".to_string(), "region".to_string()]),
382 "2024-01-15$US",
383 );
384
385 assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]);
386 }
387
388 #[test]
389 fn test_resolved_partition_spec_from_qualified_name() {
390 let spec =
391 ResolvedPartitionSpec::from_partition_qualified_name("date=2024-01-15/region=US")
392 .unwrap();
393
394 assert_eq!(spec.get_partition_keys(), &["date", "region"]);
395 assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]);
396 }
397
398 #[test]
399 fn test_resolved_partition_spec_mismatched_lengths() {
400 let result = ResolvedPartitionSpec::new(
401 Arc::from(["date".to_string(), "region".to_string()]),
402 vec!["2024-01-15".to_string()],
403 );
404
405 assert!(result.is_err());
406 }
407
408 #[test]
409 fn test_partition_info() {
410 let spec = ResolvedPartitionSpec::new(
411 Arc::from(["date".to_string()]),
412 vec!["2024-01-15".to_string()],
413 )
414 .unwrap();
415
416 let info = PartitionInfo::new(42, spec);
417 assert_eq!(info.get_partition_id(), 42);
418 assert_eq!(info.get_partition_name(), "2024-01-15");
419 }
420
421 #[test]
422 fn test_table_partition() {
423 let tp = TablePartition::new(100, 42);
424 assert_eq!(tp.get_table_id(), 100);
425 assert_eq!(tp.get_partition_id(), 42);
426 }
427
428 #[test]
429 fn test_partition_spec_pb_roundtrip() {
430 let mut map = HashMap::new();
431 map.insert("date".to_string(), "2024-01-15".to_string());
432 let spec = PartitionSpec::new(map);
433
434 let pb = spec.to_pb();
435 let restored = PartitionSpec::from_pb(&pb);
436
437 assert_eq!(
438 spec.get_spec_map().get("date"),
439 restored.get_spec_map().get("date")
440 );
441 }
442
443 #[test]
444 fn test_partition_info_pb_roundtrip() {
445 let spec = ResolvedPartitionSpec::new(
446 Arc::from(["date".to_string()]),
447 vec!["2024-01-15".to_string()],
448 )
449 .unwrap();
450 let info = PartitionInfo::new(42, spec);
451
452 let pb = info.to_pb();
453 let restored = PartitionInfo::from_pb(&pb);
454
455 assert_eq!(info.get_partition_id(), restored.get_partition_id());
456 assert_eq!(info.get_partition_name(), restored.get_partition_name());
457 }
458
459 #[test]
460 fn test_contains() {
461 let full_spec = ResolvedPartitionSpec::new(
462 Arc::from(["date".to_string(), "region".to_string()]),
463 vec!["2024-01-15".to_string(), "US".to_string()],
464 )
465 .unwrap();
466
467 let partial_spec = ResolvedPartitionSpec::new(
468 Arc::from(["date".to_string()]),
469 vec!["2024-01-15".to_string()],
470 )
471 .unwrap();
472
473 assert!(full_spec.contains(&partial_spec).unwrap());
474 }
475}