Skip to main content

fluss/metadata/
partition.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::error::{Error, Result};
19use crate::proto::{PbKeyValue, PbPartitionInfo, PbPartitionSpec};
20use crate::{PartitionId, TableId};
21use std::collections::HashMap;
22use std::fmt::{Display, Formatter};
23use std::sync::Arc;
24
25/// Represents a partition spec in fluss. Partition columns and values are NOT of strict order, and
26/// they need to be re-arranged to the correct order by comparing with a list of strictly ordered
27/// partition keys.
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct PartitionSpec {
30    partition_spec: HashMap<String, String>,
31}
32
33impl PartitionSpec {
34    pub fn new<K: Into<String>, V: Into<String>>(partition_spec: HashMap<K, V>) -> Self {
35        let mut new_map = HashMap::new();
36        for (k, v) in partition_spec {
37            new_map.insert(k.into(), v.into());
38        }
39        Self {
40            partition_spec: new_map,
41        }
42    }
43
44    pub fn get_spec_map(&self) -> &HashMap<String, String> {
45        &self.partition_spec
46    }
47
48    pub fn to_pb(&self) -> PbPartitionSpec {
49        PbPartitionSpec {
50            partition_key_values: self
51                .partition_spec
52                .iter()
53                .map(|(k, v)| PbKeyValue {
54                    key: k.clone(),
55                    value: v.clone(),
56                })
57                .collect(),
58        }
59    }
60
61    pub fn from_pb(pb: &PbPartitionSpec) -> Self {
62        let partition_spec = pb
63            .partition_key_values
64            .iter()
65            .map(|kv| (kv.key.clone(), kv.value.clone()))
66            .collect();
67        Self { partition_spec }
68    }
69}
70
71impl Display for PartitionSpec {
72    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73        write!(f, "PartitionSpec{{{:?}}}", self.partition_spec)
74    }
75}
76
77/// Represents a partition, which is the resolved version of PartitionSpec. The partition
78/// spec is re-arranged into the correct order by comparing it with a list of strictly ordered
79/// partition keys.
80#[derive(Debug, Clone, PartialEq, Eq, Hash)]
81pub struct ResolvedPartitionSpec {
82    partition_keys: Arc<[String]>,
83    partition_values: Vec<String>,
84}
85
86pub const PARTITION_SPEC_SEPARATOR: &str = "$";
87
88impl ResolvedPartitionSpec {
89    pub fn new(partition_keys: Arc<[String]>, partition_values: Vec<String>) -> Result<Self> {
90        if partition_keys.len() != partition_values.len() {
91            return Err(Error::IllegalArgument {
92                message: "The number of partition keys and partition values should be the same."
93                    .to_string(),
94            });
95        }
96
97        Ok(Self {
98            partition_keys,
99            partition_values,
100        })
101    }
102
103    pub fn from_partition_spec(
104        partition_keys: Arc<[String]>,
105        partition_spec: &PartitionSpec,
106    ) -> Self {
107        let partition_values =
108            Self::get_reordered_partition_values(&partition_keys, partition_spec);
109        Self {
110            partition_keys,
111            partition_values,
112        }
113    }
114
115    pub fn from_partition_name(partition_keys: Arc<[String]>, partition_name: &str) -> Self {
116        let partition_values: Vec<String> = partition_name
117            .split(PARTITION_SPEC_SEPARATOR)
118            .map(|s| s.to_string())
119            .collect();
120        Self {
121            partition_keys,
122            partition_values,
123        }
124    }
125
126    pub fn from_partition_qualified_name(qualified_partition_name: &str) -> Result<Self> {
127        let mut keys = Vec::new();
128        let mut values = Vec::new();
129
130        for pair in qualified_partition_name.split('/') {
131            let parts: Vec<&str> = pair.splitn(2, '=').collect();
132            if parts.len() != 2 {
133                return Err(Error::IllegalArgument {
134                    message: format!(
135                        "Invalid partition name format. Expected key=value, got: {pair}"
136                    ),
137                });
138            }
139            keys.push(parts[0].to_string());
140            values.push(parts[1].to_string());
141        }
142
143        Ok(Self {
144            partition_keys: Arc::from(keys),
145            partition_values: values,
146        })
147    }
148
149    pub fn get_partition_keys(&self) -> &[String] {
150        &self.partition_keys
151    }
152
153    pub fn get_partition_values(&self) -> &[String] {
154        &self.partition_values
155    }
156
157    pub fn to_partition_spec(&self) -> PartitionSpec {
158        let mut spec_map = HashMap::new();
159        for (i, key) in self.partition_keys.iter().enumerate() {
160            spec_map.insert(key.clone(), self.partition_values[i].clone());
161        }
162        PartitionSpec::new(spec_map)
163    }
164
165    /// Generate the partition name for a partition table with specified partition values.
166    ///
167    /// The partition name is in the following format: value1$value2$...$valueN
168    pub fn get_partition_name(&self) -> String {
169        self.partition_values.join(PARTITION_SPEC_SEPARATOR)
170    }
171
172    /// Returns the qualified partition name for a partition spec.
173    /// The format is: key1=value1/key2=value2/.../keyN=valueN
174    pub fn get_partition_qualified_name(&self) -> String {
175        let mut sb = String::new();
176        for (i, key) in self.partition_keys.iter().enumerate() {
177            sb.push_str(key);
178            sb.push('=');
179            sb.push_str(&self.partition_values[i]);
180            if i != self.partition_keys.len() - 1 {
181                sb.push('/');
182            }
183        }
184        sb
185    }
186
187    pub fn contains(&self, other: &ResolvedPartitionSpec) -> Result<bool> {
188        let other_partition_keys = other.get_partition_keys();
189        let other_partition_values = other.get_partition_values();
190
191        let mut expected_partition_values = Vec::new();
192        for other_partition_key in other_partition_keys {
193            let key_index = self
194                .partition_keys
195                .iter()
196                .position(|k| k == other_partition_key);
197            match key_index {
198                Some(idx) => expected_partition_values.push(self.partition_values[idx].clone()),
199                None => {
200                    return Err(Error::IllegalArgument {
201                        message: format!(
202                            "table does not contain partitionKey: {other_partition_key}"
203                        ),
204                    });
205                }
206            }
207        }
208
209        let expected_partition_name = expected_partition_values.join(PARTITION_SPEC_SEPARATOR);
210        let other_partition_name = other_partition_values.join(PARTITION_SPEC_SEPARATOR);
211
212        Ok(expected_partition_name == other_partition_name)
213    }
214
215    pub fn to_pb(&self) -> PbPartitionSpec {
216        PbPartitionSpec {
217            partition_key_values: self
218                .partition_keys
219                .iter()
220                .zip(self.partition_values.iter())
221                .map(|(k, v)| PbKeyValue {
222                    key: k.clone(),
223                    value: v.clone(),
224                })
225                .collect(),
226        }
227    }
228
229    pub fn from_pb(pb: &PbPartitionSpec) -> Self {
230        let partition_keys = pb
231            .partition_key_values
232            .iter()
233            .map(|kv| kv.key.clone())
234            .collect();
235        let partition_values = pb
236            .partition_key_values
237            .iter()
238            .map(|kv| kv.value.clone())
239            .collect();
240
241        Self {
242            partition_keys,
243            partition_values,
244        }
245    }
246
247    fn get_reordered_partition_values(
248        partition_keys: &Arc<[String]>,
249        partition_spec: &PartitionSpec,
250    ) -> Vec<String> {
251        let partition_spec_map = partition_spec.get_spec_map();
252        partition_keys
253            .iter()
254            .map(|key| partition_spec_map.get(key).cloned().unwrap_or_default())
255            .collect()
256    }
257}
258
259impl Display for ResolvedPartitionSpec {
260    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
261        write!(f, "{}", self.get_partition_qualified_name())
262    }
263}
264
265/// Information of a partition metadata, includes the partition's name and the partition id that
266/// represents the unique identifier of the partition.
267#[derive(Debug, Clone, PartialEq, Eq, Hash)]
268pub struct PartitionInfo {
269    partition_id: PartitionId,
270    partition_spec: ResolvedPartitionSpec,
271}
272
273impl PartitionInfo {
274    pub fn new(partition_id: PartitionId, partition_spec: ResolvedPartitionSpec) -> Self {
275        Self {
276            partition_id,
277            partition_spec,
278        }
279    }
280
281    /// Get the partition id. The id is globally unique in the Fluss cluster.
282    pub fn get_partition_id(&self) -> PartitionId {
283        self.partition_id
284    }
285
286    /// Get the partition name.
287    pub fn get_partition_name(&self) -> String {
288        self.partition_spec.get_partition_name()
289    }
290
291    pub fn get_resolved_partition_spec(&self) -> &ResolvedPartitionSpec {
292        &self.partition_spec
293    }
294
295    pub fn get_partition_spec(&self) -> PartitionSpec {
296        self.partition_spec.to_partition_spec()
297    }
298
299    pub fn to_pb(&self) -> PbPartitionInfo {
300        PbPartitionInfo {
301            partition_id: self.partition_id,
302            partition_spec: self.partition_spec.to_pb(),
303        }
304    }
305
306    pub fn from_pb(pb: &PbPartitionInfo) -> Self {
307        Self {
308            partition_id: pb.partition_id,
309            partition_spec: ResolvedPartitionSpec::from_pb(&pb.partition_spec),
310        }
311    }
312}
313
314impl Display for PartitionInfo {
315    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
316        write!(
317            f,
318            "Partition{{name='{}', id={}}}",
319            self.get_partition_name(),
320            self.partition_id
321        )
322    }
323}
324
325/// A class to identify a table partition, containing the table id and the partition id.
326#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
327pub struct TablePartition {
328    table_id: TableId,
329    partition_id: PartitionId,
330}
331
332impl TablePartition {
333    pub fn new(table_id: TableId, partition_id: PartitionId) -> Self {
334        Self {
335            table_id,
336            partition_id,
337        }
338    }
339
340    pub fn get_table_id(&self) -> i64 {
341        self.table_id
342    }
343
344    pub fn get_partition_id(&self) -> PartitionId {
345        self.partition_id
346    }
347}
348
349impl Display for TablePartition {
350    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
351        write!(
352            f,
353            "TablePartition{{tableId={}, partitionId={}}}",
354            self.table_id, self.partition_id
355        )
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362
363    #[test]
364    fn test_resolved_partition_spec_name() {
365        let spec = ResolvedPartitionSpec::new(
366            Arc::from(["date".to_string(), "region".to_string()]),
367            vec!["2024-01-15".to_string(), "US".to_string()],
368        )
369        .unwrap();
370
371        assert_eq!(spec.get_partition_name(), "2024-01-15$US");
372        assert_eq!(
373            spec.get_partition_qualified_name(),
374            "date=2024-01-15/region=US"
375        );
376    }
377
378    #[test]
379    fn test_resolved_partition_spec_from_partition_name() {
380        let spec = ResolvedPartitionSpec::from_partition_name(
381            Arc::from(["date".to_string(), "region".to_string()]),
382            "2024-01-15$US",
383        );
384
385        assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]);
386    }
387
388    #[test]
389    fn test_resolved_partition_spec_from_qualified_name() {
390        let spec =
391            ResolvedPartitionSpec::from_partition_qualified_name("date=2024-01-15/region=US")
392                .unwrap();
393
394        assert_eq!(spec.get_partition_keys(), &["date", "region"]);
395        assert_eq!(spec.get_partition_values(), &["2024-01-15", "US"]);
396    }
397
398    #[test]
399    fn test_resolved_partition_spec_mismatched_lengths() {
400        let result = ResolvedPartitionSpec::new(
401            Arc::from(["date".to_string(), "region".to_string()]),
402            vec!["2024-01-15".to_string()],
403        );
404
405        assert!(result.is_err());
406    }
407
408    #[test]
409    fn test_partition_info() {
410        let spec = ResolvedPartitionSpec::new(
411            Arc::from(["date".to_string()]),
412            vec!["2024-01-15".to_string()],
413        )
414        .unwrap();
415
416        let info = PartitionInfo::new(42, spec);
417        assert_eq!(info.get_partition_id(), 42);
418        assert_eq!(info.get_partition_name(), "2024-01-15");
419    }
420
421    #[test]
422    fn test_table_partition() {
423        let tp = TablePartition::new(100, 42);
424        assert_eq!(tp.get_table_id(), 100);
425        assert_eq!(tp.get_partition_id(), 42);
426    }
427
428    #[test]
429    fn test_partition_spec_pb_roundtrip() {
430        let mut map = HashMap::new();
431        map.insert("date".to_string(), "2024-01-15".to_string());
432        let spec = PartitionSpec::new(map);
433
434        let pb = spec.to_pb();
435        let restored = PartitionSpec::from_pb(&pb);
436
437        assert_eq!(
438            spec.get_spec_map().get("date"),
439            restored.get_spec_map().get("date")
440        );
441    }
442
443    #[test]
444    fn test_partition_info_pb_roundtrip() {
445        let spec = ResolvedPartitionSpec::new(
446            Arc::from(["date".to_string()]),
447            vec!["2024-01-15".to_string()],
448        )
449        .unwrap();
450        let info = PartitionInfo::new(42, spec);
451
452        let pb = info.to_pb();
453        let restored = PartitionInfo::from_pb(&pb);
454
455        assert_eq!(info.get_partition_id(), restored.get_partition_id());
456        assert_eq!(info.get_partition_name(), restored.get_partition_name());
457    }
458
459    #[test]
460    fn test_contains() {
461        let full_spec = ResolvedPartitionSpec::new(
462            Arc::from(["date".to_string(), "region".to_string()]),
463            vec!["2024-01-15".to_string(), "US".to_string()],
464        )
465        .unwrap();
466
467        let partial_spec = ResolvedPartitionSpec::new(
468            Arc::from(["date".to_string()]),
469            vec!["2024-01-15".to_string()],
470        )
471        .unwrap();
472
473        assert!(full_spec.contains(&partial_spec).unwrap());
474    }
475}