mysql_binlog_connector_rust/command/
gtid_set.rs

1use std::{cmp, collections::HashMap, fmt::Display};
2
3use crate::binlog_error::BinlogError;
4
5#[derive(Clone, Debug, PartialEq, Eq, Hash)]
6pub struct Interval {
7    pub start: u64,
8    pub end: u64,
9}
10
11#[derive(Debug, PartialEq, Eq)]
12pub struct GtidSet {
13    pub map: HashMap<String, UuidSet>,
14}
15
16#[derive(Clone, Debug, PartialEq, Eq)]
17pub struct UuidSet {
18    pub uuid: String,
19    pub intervals: Vec<Interval>,
20}
21
22impl Interval {
23    pub fn new(start: u64, end: u64) -> Self {
24        Self { start, end }
25    }
26
27    pub fn is_contained_within(&self, other: &Interval) -> bool {
28        self.start >= other.start && self.end <= other.end
29    }
30}
31
32impl UuidSet {
33    pub fn new(uuid: String, intervals: Vec<Interval>) -> Self {
34        let mut me = Self { uuid, intervals };
35        if me.intervals.len() > 1 {
36            me.join_adjacent_intervals(0)
37        }
38        me
39    }
40
41    pub fn add(&mut self, transaction_id: u64) -> bool {
42        let index = self.find_interval(transaction_id);
43        let mut added_to_existing = false;
44
45        if index < self.intervals.len() {
46            let interval = &mut self.intervals[index];
47            if interval.start == transaction_id + 1 {
48                interval.start = transaction_id;
49                added_to_existing = true;
50            } else if interval.end + 1 == transaction_id {
51                interval.end = transaction_id;
52                added_to_existing = true;
53            } else if interval.start <= transaction_id && transaction_id <= interval.end {
54                return false;
55            }
56        }
57
58        if !added_to_existing {
59            self.intervals.insert(
60                index,
61                Interval {
62                    start: transaction_id,
63                    end: transaction_id,
64                },
65            );
66        }
67
68        if self.intervals.len() > 1 {
69            self.join_adjacent_intervals(index);
70        }
71        true
72    }
73
74    fn is_contained_within(&self, other: &UuidSet) -> bool {
75        if self.uuid != other.uuid {
76            return false;
77        }
78
79        // every interval in this must be within an interval of the other
80        for i in self.intervals.iter() {
81            let mut found = false;
82            for o in other.intervals.iter() {
83                if i.is_contained_within(o) {
84                    found = true;
85                    break;
86                }
87            }
88            if !found {
89                return false;
90            }
91        }
92        true
93    }
94
95    fn join_adjacent_intervals(&mut self, index: usize) {
96        if self.intervals.is_empty() {
97            return;
98        }
99
100        let mut i = cmp::min(index + 1, self.intervals.len() - 1);
101        let mut end = 0;
102        if index >= 1 {
103            end = index - 1;
104        }
105
106        while i > end {
107            if self.intervals[i - 1].end + 1 == self.intervals[i].start {
108                self.intervals[i - 1].end = self.intervals[i].end;
109                self.intervals.remove(i);
110            }
111            i -= 1;
112        }
113    }
114
115    fn find_interval(&self, v: u64) -> usize {
116        let mut l = 0;
117        let mut r = self.intervals.len();
118        let mut p = 0;
119
120        while l < r {
121            p = (l + r) / 2;
122            let i = &self.intervals[p];
123            if i.end < v {
124                l = p + 1;
125            } else if v < i.start {
126                r = p;
127            } else {
128                return p;
129            }
130        }
131
132        if !self.intervals.is_empty() && self.intervals[p].end < v {
133            p += 1;
134        }
135        p
136    }
137}
138
139impl Display for UuidSet {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        let mut result = self.uuid.clone();
142        for interval in &self.intervals {
143            result += &format!(":{}-{}", interval.start, interval.end);
144        }
145        write!(f, "{}", result)
146    }
147}
148
149// refer to: https://dev.mysql.com/doc/refman/8.0/en/replication-gtids-concepts.html
150impl GtidSet {
151    pub fn new(gtid_set: &str) -> Result<Self, BinlogError> {
152        let mut map: HashMap<String, UuidSet> = HashMap::new();
153        // 6d3960f6-4b36-11ef-8614-0242ac110002:1-5:7-10:12,
154        // 787d08c4-4b36-11ef-8614-0242ac110006:1-5
155        let lines = gtid_set.replace('\n', "");
156        let uuid_sets: Vec<&str> = lines.split(',').collect();
157
158        for uuid_set in uuid_sets {
159            if uuid_set.is_empty() {
160                continue;
161            }
162
163            let parts: Vec<&str> = uuid_set.split(':').collect();
164            if parts.len() < 2 {
165                return Err(BinlogError::InvalidGtid(uuid_set.to_string()));
166            }
167
168            let source_id = parts[0].to_string();
169            let mut intervals = vec![];
170            for interval_str in parts[1..].iter() {
171                let interval_parts: Vec<&str> = interval_str.split('-').collect();
172                if interval_parts.is_empty() {
173                    return Err(BinlogError::InvalidGtid(uuid_set.to_string()));
174                }
175
176                let start = Self::parse_interval_num(interval_parts[0], uuid_set)?;
177                let end = if interval_parts.len() > 1 {
178                    Self::parse_interval_num(interval_parts[1], uuid_set)?
179                } else {
180                    start
181                };
182                intervals.push(Interval { start, end });
183            }
184            map.insert(source_id.clone(), UuidSet::new(source_id, intervals));
185        }
186        Ok(GtidSet { map })
187    }
188
189    pub fn add(&mut self, gtid: &str) -> Result<bool, BinlogError> {
190        let split: Vec<&str> = gtid.split(':').collect();
191        if split.len() != 2 {
192            return Err(BinlogError::InvalidGtid(gtid.to_string()));
193        }
194
195        let source_id = split[0];
196        if let Ok(transaction_id) = split[1].parse::<u64>() {
197            let uuid_set = self
198                .map
199                .entry(source_id.to_string())
200                .or_insert_with(|| UuidSet {
201                    uuid: source_id.to_string(),
202                    intervals: vec![],
203                });
204            Ok(uuid_set.add(transaction_id))
205        } else {
206            Err(BinlogError::InvalidGtid(gtid.to_string()))
207        }
208    }
209
210    pub fn get_uuid_sets(&self) -> Vec<&UuidSet> {
211        self.map.values().collect()
212    }
213
214    pub fn put_uuid_set(&mut self, uuid_set: UuidSet) {
215        self.map.insert(uuid_set.uuid.clone(), uuid_set);
216    }
217
218    pub fn is_contained_within(&self, other: &GtidSet) -> bool {
219        for (uuid, i_set) in self.map.iter() {
220            if let Some(o_set) = other.map.get(uuid) {
221                if !i_set.is_contained_within(o_set) {
222                    return false;
223                }
224            } else {
225                return false;
226            }
227        }
228        true
229    }
230
231    fn parse_interval_num(interval_num_str: &str, uuid_set: &str) -> Result<u64, BinlogError> {
232        if let Ok(num) = interval_num_str.parse::<u64>() {
233            Ok(num)
234        } else {
235            Err(BinlogError::InvalidGtid(uuid_set.to_string()))
236        }
237    }
238}
239
240impl Display for GtidSet {
241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242        let mut gtids = vec![];
243        let mut uuids: Vec<_> = self.map.keys().collect();
244        uuids.sort();
245
246        for key in uuids {
247            let uuid_set = self.map.get(key).unwrap();
248            gtids.push(uuid_set.to_string());
249        }
250        write!(f, "{}", gtids.join(","))
251    }
252}
253
254#[cfg(test)]
255mod tests {
256
257    use crate::command::gtid_set::Interval;
258
259    use super::GtidSet;
260
261    const UUID: &'static str = "24bc7850-2c16-11e6-a073-0242ac110002";
262
263    #[test]
264    fn test_add() {
265        let mut gtid_set = GtidSet::new("00000000-0000-0000-0000-000000000000:3-5").unwrap();
266        gtid_set
267            .add("00000000-0000-0000-0000-000000000000:2")
268            .unwrap();
269        gtid_set
270            .add("00000000-0000-0000-0000-000000000000:4")
271            .unwrap();
272        gtid_set
273            .add("00000000-0000-0000-0000-000000000000:5")
274            .unwrap();
275        gtid_set
276            .add("00000000-0000-0000-0000-000000000000:7")
277            .unwrap();
278        gtid_set
279            .add("00000000-0000-0000-0000-000000000001:9")
280            .unwrap();
281        gtid_set
282            .add("00000000-0000-0000-0000-000000000000:0")
283            .unwrap();
284        assert_eq!(gtid_set.to_string(),
285            "00000000-0000-0000-0000-000000000000:0-0:2-5:7-7,00000000-0000-0000-0000-000000000001:9-9");
286    }
287
288    #[test]
289    fn test_join() {
290        let mut gtid_set = GtidSet::new("00000000-0000-0000-0000-000000000000:3-4:6-7").unwrap();
291        gtid_set
292            .add("00000000-0000-0000-0000-000000000000:5")
293            .unwrap();
294        let first_interval = gtid_set
295            .get_uuid_sets()
296            .first()
297            .unwrap()
298            .intervals
299            .first()
300            .unwrap();
301        assert_eq!(first_interval.end, 7);
302        assert_eq!(
303            gtid_set.to_string(),
304            "00000000-0000-0000-0000-000000000000:3-7"
305        );
306    }
307
308    #[test]
309    fn test_empty_set() {
310        assert_eq!(GtidSet::new("").unwrap().to_string(), "");
311    }
312
313    #[test]
314    fn test_equals() {
315        assert_eq!(GtidSet::new("").unwrap(), GtidSet::new("").unwrap());
316        assert_eq!(
317            GtidSet::new(&format!("{}:1-191", UUID)).unwrap(),
318            GtidSet::new(&format!("{}:1-191", UUID)).unwrap()
319        );
320        assert_eq!(
321            GtidSet::new(&format!("{}:1-191:192-199", UUID)).unwrap(),
322            GtidSet::new(&format!("{}:1-191:192-199", UUID)).unwrap()
323        );
324        assert_eq!(
325            GtidSet::new(&format!("{}:1-191:192-199", UUID)).unwrap(),
326            GtidSet::new(&format!("{}:1-199", UUID)).unwrap()
327        );
328        assert_eq!(
329            GtidSet::new(&format!("{}:1-191:193-199", UUID)).unwrap(),
330            GtidSet::new(&format!("{}:1-191:193-199", UUID)).unwrap()
331        );
332        assert_ne!(
333            GtidSet::new(&format!("{}:1-191:193-199", UUID)).unwrap(),
334            GtidSet::new(&format!("{}:1-199", UUID)).unwrap()
335        );
336    }
337
338    #[test]
339    fn test_subset_of() {
340        let set = vec![
341            GtidSet::new("").unwrap(),
342            GtidSet::new(&format!("{}:1-191", UUID)).unwrap(),
343            GtidSet::new(&format!("{}:192-199", UUID)).unwrap(),
344            GtidSet::new(&format!("{}:1-191:192-199", UUID)).unwrap(),
345            GtidSet::new(&format!("{}:1-191:193-199", UUID)).unwrap(),
346            GtidSet::new(&format!("{}:2-199", UUID)).unwrap(),
347            GtidSet::new(&format!("{}:1-200", UUID)).unwrap(),
348        ];
349
350        let subset_matrix = &[
351            &[1, 1, 1, 1, 1, 1, 1],
352            &[0, 1, 0, 1, 1, 0, 1],
353            &[0, 0, 1, 1, 0, 1, 1],
354            &[0, 0, 0, 1, 0, 0, 1],
355            &[0, 0, 0, 1, 1, 0, 1],
356            &[0, 0, 0, 1, 0, 1, 1],
357            &[0, 0, 0, 0, 0, 0, 1],
358        ];
359
360        for (i, subset) in subset_matrix.iter().enumerate() {
361            for (j, &is_subset) in subset.iter().enumerate() {
362                assert_eq!(
363                    set[i].is_contained_within(&set[j]),
364                    is_subset == 1,
365                    "\"{:?}\" was expected to be a subset of \"{:?}\" ({},{})",
366                    set[i],
367                    set[j],
368                    i,
369                    j
370                );
371            }
372        }
373    }
374
375    #[test]
376    fn test_single_interval() {
377        let gtid_set = GtidSet::new(&format!("{}:1-191", UUID)).unwrap();
378        let uuid_set = gtid_set.map.get(UUID).unwrap();
379        assert_eq!(uuid_set.intervals.len(), 1);
380        assert!(uuid_set.intervals.contains(&Interval::new(1, 191)));
381        assert_eq!(
382            uuid_set.intervals.iter().next(),
383            Some(&Interval::new(1, 191))
384        );
385        assert_eq!(uuid_set.intervals.last(), Some(&Interval::new(1, 191)));
386        assert_eq!(gtid_set.to_string(), format!("{}:1-191", UUID));
387    }
388
389    #[test]
390    fn test_collapse_adjacent_intervals() {
391        let gtid_set = GtidSet::new(&format!("{}:1-191:192-199", UUID)).unwrap();
392        let uuid_set = gtid_set.map.get(UUID).unwrap();
393        assert_eq!(uuid_set.intervals.len(), 1);
394        assert!(uuid_set.intervals.contains(&Interval::new(1, 199)));
395        assert_eq!(
396            uuid_set.intervals.iter().next(),
397            Some(&Interval::new(1, 199))
398        );
399        assert_eq!(uuid_set.intervals.last(), Some(&Interval::new(1, 199)));
400        assert_eq!(gtid_set.to_string(), format!("{}:1-199", UUID));
401    }
402
403    #[test]
404    fn test_not_collapse_non_adjacent_intervals() {
405        let gtid_set = GtidSet::new(&format!("{}:1-191:193-199", UUID)).unwrap();
406        let uuid_set = gtid_set.map.get(UUID).unwrap();
407        assert_eq!(uuid_set.intervals.len(), 2);
408        assert_eq!(
409            uuid_set.intervals.iter().next(),
410            Some(&Interval::new(1, 191))
411        );
412        assert_eq!(uuid_set.intervals.last(), Some(&Interval::new(193, 199)));
413        assert_eq!(gtid_set.to_string(), format!("{}:1-191:193-199", UUID));
414    }
415
416    #[test]
417    fn test_multiple_intervals() {
418        let gtid_set = GtidSet::new(&format!("{}:1-191:193-199:1000-1033", UUID)).unwrap();
419        let uuid_set = gtid_set.map.get(UUID).unwrap();
420        assert_eq!(uuid_set.intervals.len(), 3);
421        assert!(uuid_set.intervals.contains(&Interval::new(193, 199)));
422        assert_eq!(uuid_set.intervals.first(), Some(&Interval::new(1, 191)));
423        assert_eq!(uuid_set.intervals.last(), Some(&Interval::new(1000, 1033)));
424        assert_eq!(
425            gtid_set.to_string(),
426            format!("{}:1-191:193-199:1000-1033", UUID)
427        );
428    }
429
430    #[test]
431    fn test_multiple_intervals_that_may_be_adjacent() {
432        let gtid_set = GtidSet::new(&format!(
433            "{}:1-191:192-199:1000-1033:1035-1036:1038-1039",
434            UUID
435        ))
436        .unwrap();
437        let uuid_set = gtid_set.map.get(UUID).unwrap();
438        assert_eq!(uuid_set.intervals.len(), 4);
439        assert!(uuid_set.intervals.contains(&Interval::new(1000, 1033)));
440        assert!(uuid_set.intervals.contains(&Interval::new(1035, 1036)));
441        assert_eq!(uuid_set.intervals.first(), Some(&Interval::new(1, 199)));
442        assert_eq!(uuid_set.intervals.last(), Some(&Interval::new(1038, 1039)));
443        assert_eq!(
444            gtid_set.to_string(),
445            format!("{}:1-199:1000-1033:1035-1036:1038-1039", UUID)
446        );
447    }
448
449    #[test]
450    fn test_put_uuid_set() {
451        let mut gtid_set = GtidSet::new(&format!("{}:1-191", UUID)).unwrap();
452        let gtid_set2 = GtidSet::new(&format!("{}:1-190", UUID)).unwrap();
453        let uuid_set2 = gtid_set2.map.get(UUID).unwrap();
454        gtid_set.put_uuid_set(uuid_set2.clone());
455        assert_eq!(gtid_set, gtid_set2);
456    }
457}