1#![allow(clippy::unnecessary_literal_bound)]
18
19use std::cmp::Ordering;
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use tracing::{debug, info};
24
25pub trait CollationFunction: Send + Sync {
32 fn name(&self) -> &str;
34
35 fn compare(&self, left: &[u8], right: &[u8]) -> Ordering;
39}
40
41pub struct BinaryCollation;
48
49impl CollationFunction for BinaryCollation {
50 fn name(&self) -> &str {
51 "BINARY"
52 }
53
54 fn compare(&self, left: &[u8], right: &[u8]) -> Ordering {
55 left.cmp(right)
56 }
57}
58
59pub struct NoCaseCollation;
64
65impl CollationFunction for NoCaseCollation {
66 fn name(&self) -> &str {
67 "NOCASE"
68 }
69
70 fn compare(&self, left: &[u8], right: &[u8]) -> Ordering {
71 let l = left.iter().map(u8::to_ascii_uppercase);
72 let r = right.iter().map(u8::to_ascii_uppercase);
73 l.cmp(r)
74 }
75}
76
77pub struct RtrimCollation;
82
83impl CollationFunction for RtrimCollation {
84 fn name(&self) -> &str {
85 "RTRIM"
86 }
87
88 fn compare(&self, left: &[u8], right: &[u8]) -> Ordering {
89 let l = strip_trailing_spaces(left);
90 let r = strip_trailing_spaces(right);
91 l.cmp(r)
92 }
93}
94
95fn strip_trailing_spaces(s: &[u8]) -> &[u8] {
96 let mut end = s.len();
97 while end > 0 && s[end - 1] == b' ' {
98 end -= 1;
99 }
100 &s[..end]
101}
102
103#[derive(Clone)]
110pub struct CollationRegistry {
111 collations: HashMap<String, Arc<dyn CollationFunction>>,
112}
113
114impl std::fmt::Debug for CollationRegistry {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 let mut names = self.collations.keys().cloned().collect::<Vec<_>>();
117 names.sort_unstable();
118 f.debug_struct("CollationRegistry")
119 .field("collations", &names)
120 .finish()
121 }
122}
123
124impl Default for CollationRegistry {
125 fn default() -> Self {
126 Self::new()
127 }
128}
129
130impl CollationRegistry {
131 #[must_use]
133 pub fn new() -> Self {
134 let mut collations = HashMap::with_capacity(3);
135 collations.insert(
136 "BINARY".to_owned(),
137 Arc::new(BinaryCollation) as Arc<dyn CollationFunction>,
138 );
139 collations.insert(
140 "NOCASE".to_owned(),
141 Arc::new(NoCaseCollation) as Arc<dyn CollationFunction>,
142 );
143 collations.insert(
144 "RTRIM".to_owned(),
145 Arc::new(RtrimCollation) as Arc<dyn CollationFunction>,
146 );
147 Self { collations }
148 }
149
150 pub fn register<C: CollationFunction + 'static>(
155 &mut self,
156 collation: C,
157 ) -> Option<Arc<dyn CollationFunction>> {
158 let name = collation.name().to_ascii_uppercase();
159 info!(collation_name = %name, deterministic = true, "custom collation registration");
160 self.collations.insert(name, Arc::new(collation))
161 }
162
163 #[must_use]
167 pub fn find(&self, name: &str) -> Option<Arc<dyn CollationFunction>> {
168 let canon = name.to_ascii_uppercase();
169 let result = self.collations.get(&canon).cloned();
170 debug!(
171 collation = %canon,
172 hit = result.is_some(),
173 "collation registry lookup"
174 );
175 result
176 }
177
178 #[must_use]
180 pub fn contains(&self, name: &str) -> bool {
181 self.collations.contains_key(&name.to_ascii_uppercase())
182 }
183
184 #[must_use]
190 pub fn names(&self) -> Vec<String> {
191 let mut names = vec!["BINARY".to_owned(), "NOCASE".to_owned(), "RTRIM".to_owned()];
192 let mut custom: Vec<String> = self
193 .collations
194 .keys()
195 .filter(|name| !matches!(name.as_str(), "BINARY" | "NOCASE" | "RTRIM"))
196 .cloned()
197 .collect();
198 custom.sort_unstable_by_key(|name| name.to_ascii_uppercase());
199 names.extend(custom);
200 names
201 }
202}
203
204#[derive(Debug, Clone, Copy, PartialEq, Eq)]
211pub enum CollationSource {
212 Explicit,
214 Schema,
216 Default,
218}
219
220#[derive(Debug, Clone)]
222pub struct CollationAnnotation {
223 pub name: String,
225 pub source: CollationSource,
227}
228
229#[must_use]
239pub fn resolve_collation(lhs: &CollationAnnotation, rhs: &CollationAnnotation) -> String {
240 let result = match (lhs.source, rhs.source) {
242 (_, CollationSource::Explicit) if lhs.source != CollationSource::Explicit => &rhs.name,
243 (CollationSource::Default, CollationSource::Schema) => &rhs.name,
244 _ => &lhs.name,
245 };
246 debug!(
247 collation = %result,
248 lhs_source = ?lhs.source,
249 rhs_source = ?rhs.source,
250 context = "COMPARE",
251 "collation selection"
252 );
253 result.clone()
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259
260 #[test]
263 fn test_collation_binary_memcmp() {
264 let coll = BinaryCollation;
265 assert_eq!(coll.compare(b"abc", b"abc"), Ordering::Equal);
266 assert_eq!(coll.compare(b"abc", b"abd"), Ordering::Less);
267 assert_eq!(coll.compare(b"abd", b"abc"), Ordering::Greater);
268 assert_eq!(coll.compare(b"ABC", b"abc"), Ordering::Less);
270 assert_eq!(
272 coll.compare("café".as_bytes(), "café".as_bytes()),
273 Ordering::Equal
274 );
275 assert_ne!(coll.compare("über".as_bytes(), b"uber"), Ordering::Equal);
276 }
277
278 #[test]
279 fn test_collation_binary_basic() {
280 let coll = BinaryCollation;
281 assert_eq!(coll.compare(b"ABC", b"abc"), Ordering::Less);
283 assert_eq!(coll.compare(b"\x00", b"\x01"), Ordering::Less);
285 assert_eq!(coll.compare(b"\xff", b"\x00"), Ordering::Greater);
286 }
287
288 #[test]
289 fn test_collation_nocase_ascii() {
290 let coll = NoCaseCollation;
291 assert_eq!(coll.compare(b"ABC", b"abc"), Ordering::Equal);
292 assert_eq!(coll.compare(b"Alice", b"alice"), Ordering::Equal);
293 assert_eq!(coll.compare(b"[", b"a"), Ordering::Greater);
295 }
296
297 #[test]
298 fn test_collation_nocase_ascii_only() {
299 let coll = NoCaseCollation;
300 assert_ne!(
302 coll.compare("Ä".as_bytes(), "ä".as_bytes()),
303 Ordering::Equal,
304 "NOCASE must NOT fold non-ASCII"
305 );
306 assert_eq!(coll.compare(b"Z", b"z"), Ordering::Equal);
308 assert_eq!(coll.compare(b"[", b"["), Ordering::Equal);
309 assert_ne!(coll.compare(b"[", b"{"), Ordering::Equal);
311 }
312
313 #[test]
314 fn test_collation_rtrim() {
315 let coll = RtrimCollation;
316 assert_eq!(coll.compare(b"hello ", b"hello"), Ordering::Equal);
318 assert_eq!(coll.compare(b"hello", b"hello "), Ordering::Equal);
319 assert_eq!(coll.compare(b"hello ", b"hello "), Ordering::Equal);
320 assert_ne!(coll.compare(b"hello!", b"hello"), Ordering::Equal);
322 assert_ne!(coll.compare(b"hello ", b"hello!"), Ordering::Equal);
324 }
325
326 #[test]
327 fn test_collation_rtrim_tabs_not_stripped() {
328 let coll = RtrimCollation;
329 assert_ne!(
331 coll.compare(b"hello\t", b"hello"),
332 Ordering::Equal,
333 "RTRIM must NOT strip tabs"
334 );
335 assert_ne!(
337 coll.compare(b"hello\xc2\xa0", b"hello"),
338 Ordering::Equal,
339 "RTRIM must NOT strip non-breaking spaces"
340 );
341 }
342
343 #[test]
344 fn test_collation_properties_antisymmetric() {
345 let collations: Vec<Box<dyn CollationFunction>> = vec![
346 Box::new(BinaryCollation),
347 Box::new(NoCaseCollation),
348 Box::new(RtrimCollation),
349 ];
350
351 let pairs: &[(&[u8], &[u8])] = &[
352 (b"abc", b"def"),
353 (b"hello", b"world"),
354 (b"ABC", b"abc"),
355 (b"hello ", b"hello"),
356 ];
357
358 for coll in &collations {
359 for &(a, b) in pairs {
360 let forward = coll.compare(a, b);
361 let reverse = coll.compare(b, a);
362 assert_eq!(
363 forward,
364 reverse.reverse(),
365 "{}: compare({:?}, {:?}) = {forward:?}, but reverse = {reverse:?}",
366 coll.name(),
367 std::str::from_utf8(a).unwrap_or("?"),
368 std::str::from_utf8(b).unwrap_or("?"),
369 );
370 }
371 }
372 }
373
374 #[test]
375 fn test_collation_properties_transitive() {
376 let coll = BinaryCollation;
377 let a = b"apple";
378 let b = b"banana";
379 let c = b"cherry";
380
381 assert_eq!(coll.compare(a, b), Ordering::Less);
383 assert_eq!(coll.compare(b, c), Ordering::Less);
384 assert_eq!(coll.compare(a, c), Ordering::Less);
385 }
386
387 #[test]
388 fn test_collation_send_sync() {
389 fn assert_send_sync<T: Send + Sync>() {}
390 assert_send_sync::<BinaryCollation>();
391 assert_send_sync::<NoCaseCollation>();
392 assert_send_sync::<RtrimCollation>();
393 }
394
395 #[test]
398 fn test_registry_preloaded_builtins() {
399 let reg = CollationRegistry::new();
400 assert!(reg.contains("BINARY"));
401 assert!(reg.contains("NOCASE"));
402 assert!(reg.contains("RTRIM"));
403
404 let binary = reg.find("BINARY").expect("BINARY must be pre-registered");
405 assert_eq!(binary.compare(b"a", b"b"), Ordering::Less);
406
407 let nocase = reg.find("NOCASE").expect("NOCASE must be pre-registered");
408 assert_eq!(nocase.compare(b"ABC", b"abc"), Ordering::Equal);
409
410 let rtrim = reg.find("RTRIM").expect("RTRIM must be pre-registered");
411 assert_eq!(rtrim.compare(b"x ", b"x"), Ordering::Equal);
412 }
413
414 struct ReverseCollation;
415
416 impl CollationFunction for ReverseCollation {
417 fn name(&self) -> &str {
418 "REVERSE"
419 }
420
421 fn compare(&self, left: &[u8], right: &[u8]) -> Ordering {
422 right.cmp(left)
423 }
424 }
425
426 #[test]
427 fn test_registry_custom_collation_registration() {
428 let mut reg = CollationRegistry::new();
429
430 let prev = reg.register(ReverseCollation);
431 assert!(prev.is_none(), "no prior REVERSE collation");
432 assert!(reg.contains("REVERSE"));
433
434 let coll = reg.find("reverse").expect("case-insensitive lookup");
435 assert_eq!(coll.compare(b"a", b"z"), Ordering::Greater);
436 }
437
438 struct AlwaysEqualCollation;
439
440 impl CollationFunction for AlwaysEqualCollation {
441 fn name(&self) -> &str {
442 "BINARY"
443 }
444
445 fn compare(&self, _left: &[u8], _right: &[u8]) -> Ordering {
446 Ordering::Equal
447 }
448 }
449
450 #[test]
451 fn test_registry_overwrite_builtin() {
452 let mut reg = CollationRegistry::new();
453
454 let prev = reg.register(AlwaysEqualCollation);
455 assert!(prev.is_some(), "should return previous BINARY collation");
456
457 let coll = reg.find("BINARY").unwrap();
458 assert_eq!(
459 coll.compare(b"a", b"z"),
460 Ordering::Equal,
461 "custom overwrite must take effect"
462 );
463 }
464
465 #[test]
466 fn test_registry_unregistered_returns_none() {
467 let reg = CollationRegistry::new();
468 assert!(reg.find("NONEXISTENT").is_none());
469 assert!(!reg.contains("NONEXISTENT"));
470 }
471
472 #[test]
473 fn test_registry_name_case_insensitive() {
474 let reg = CollationRegistry::new();
475 assert!(reg.find("BINARY").is_some());
477 assert!(reg.find("binary").is_some());
478 assert!(reg.find("Binary").is_some());
479 assert!(reg.find("bInArY").is_some());
480
481 assert!(reg.contains("nocase"));
483 assert!(reg.contains("NOCASE"));
484 assert!(reg.contains("NoCase"));
485 }
486
487 fn ann(name: &str, source: CollationSource) -> CollationAnnotation {
490 CollationAnnotation {
491 name: name.to_owned(),
492 source,
493 }
494 }
495
496 #[test]
497 fn test_collation_selection_explicit_wins() {
498 let result = resolve_collation(
500 &ann("NOCASE", CollationSource::Explicit),
501 &ann("BINARY", CollationSource::Default),
502 );
503 assert_eq!(result, "NOCASE");
504 }
505
506 #[test]
507 fn test_collation_selection_explicit_rhs_wins_over_default() {
508 let result = resolve_collation(
509 &ann("BINARY", CollationSource::Default),
510 &ann("RTRIM", CollationSource::Explicit),
511 );
512 assert_eq!(result, "RTRIM");
513 }
514
515 #[test]
516 fn test_collation_selection_leftmost_explicit_wins() {
517 let result = resolve_collation(
519 &ann("NOCASE", CollationSource::Explicit),
520 &ann("RTRIM", CollationSource::Explicit),
521 );
522 assert_eq!(result, "NOCASE");
523 }
524
525 #[test]
526 fn test_collation_selection_schema_over_default() {
527 let result = resolve_collation(
528 &ann("NOCASE", CollationSource::Schema),
529 &ann("BINARY", CollationSource::Default),
530 );
531 assert_eq!(result, "NOCASE");
532 }
533
534 #[test]
535 fn test_collation_selection_schema_rhs_over_default() {
536 let result = resolve_collation(
537 &ann("BINARY", CollationSource::Default),
538 &ann("NOCASE", CollationSource::Schema),
539 );
540 assert_eq!(result, "NOCASE");
541 }
542
543 #[test]
544 fn test_collation_selection_explicit_over_schema() {
545 let result = resolve_collation(
546 &ann("RTRIM", CollationSource::Explicit),
547 &ann("NOCASE", CollationSource::Schema),
548 );
549 assert_eq!(result, "RTRIM");
550 }
551
552 #[test]
553 fn test_collation_selection_default_binary() {
554 let result = resolve_collation(
555 &ann("BINARY", CollationSource::Default),
556 &ann("BINARY", CollationSource::Default),
557 );
558 assert_eq!(result, "BINARY");
559 }
560
561 #[test]
564 fn test_min_respects_collation() {
565 let binary = BinaryCollation;
567 let binary_min = if binary.compare(b"ABC", b"abc") == Ordering::Less {
568 "ABC"
569 } else {
570 "abc"
571 };
572 assert_eq!(binary_min, "ABC");
573
574 let nocase = NoCaseCollation;
576 assert_eq!(nocase.compare(b"ABC", b"abc"), Ordering::Equal);
577 }
578
579 #[test]
580 fn test_max_respects_collation() {
581 let binary = BinaryCollation;
582 let binary_max = if binary.compare(b"abc", b"ABC") == Ordering::Greater {
584 "abc"
585 } else {
586 "ABC"
587 };
588 assert_eq!(binary_max, "abc");
589 }
590
591 #[test]
592 fn test_collation_aware_sort() {
593 let nocase = NoCaseCollation;
595 let mut data: Vec<&[u8]> = vec![b"Banana", b"apple", b"Cherry", b"date"];
596 data.sort_by(|a, b| nocase.compare(a, b));
597
598 assert_eq!(data[0], b"apple");
600 assert_eq!(data[1], b"Banana");
601 assert_eq!(data[2], b"Cherry");
602 assert_eq!(data[3], b"date");
603 }
604
605 #[test]
606 fn test_collation_aware_group_by() {
607 let nocase = NoCaseCollation;
609 let items: Vec<&[u8]> = vec![b"ABC", b"abc", b"Abc", b"def", b"DEF"];
610 let mut groups: Vec<Vec<&[u8]>> = Vec::new();
611
612 let mut sorted = items;
614 sorted.sort_by(|a, b| nocase.compare(a, b));
615
616 let mut current_group: Vec<&[u8]> = vec![sorted[0]];
617 for window in sorted.windows(2) {
618 if nocase.compare(window[0], window[1]) != Ordering::Equal {
619 groups.push(std::mem::take(&mut current_group));
620 }
621 current_group.push(window[1]);
622 }
623 groups.push(current_group);
624
625 assert_eq!(groups.len(), 2);
627 assert_eq!(groups[0].len(), 3);
628 assert_eq!(groups[1].len(), 2);
629 }
630
631 #[test]
632 fn test_collation_aware_distinct() {
633 let nocase = NoCaseCollation;
635 let items: Vec<&[u8]> = vec![b"ABC", b"abc", b"Abc", b"def", b"DEF"];
636
637 let mut distinct: Vec<&[u8]> = Vec::new();
638 for item in &items {
639 let already = distinct
640 .iter()
641 .any(|d| nocase.compare(d, item) == Ordering::Equal);
642 if !already {
643 distinct.push(item);
644 }
645 }
646
647 assert_eq!(distinct.len(), 2);
649 }
650
651 #[test]
652 fn test_registry_default_impl() {
653 let reg = CollationRegistry::default();
655 assert!(reg.contains("BINARY"));
656 assert!(reg.contains("NOCASE"));
657 assert!(reg.contains("RTRIM"));
658 }
659
660 #[test]
661 fn test_collation_annotation_debug() {
662 let ann = CollationAnnotation {
663 name: "NOCASE".to_owned(),
664 source: CollationSource::Explicit,
665 };
666 let debug_str = format!("{ann:?}");
667 assert!(debug_str.contains("NOCASE"));
668 assert!(debug_str.contains("Explicit"));
669 }
670
671 #[test]
672 fn test_collation_source_equality() {
673 assert_eq!(CollationSource::Explicit, CollationSource::Explicit);
674 assert_ne!(CollationSource::Explicit, CollationSource::Schema);
675 assert_ne!(CollationSource::Schema, CollationSource::Default);
676 }
677}