1#![allow(clippy::unnecessary_literal_bound)]
18
19use std::cmp::Ordering;
20use std::collections::HashMap;
21use std::sync::{Arc, OnceLock};
22
23use tracing::{debug, info};
24
25pub trait CollationFunction: Send + Sync {
32 fn name(&self) -> &str;
34
35 fn compare(&self, left: &[u8], right: &[u8]) -> Ordering;
39}
40
41pub struct BinaryCollation;
48
49impl CollationFunction for BinaryCollation {
50 fn name(&self) -> &str {
51 "BINARY"
52 }
53
54 fn compare(&self, left: &[u8], right: &[u8]) -> Ordering {
55 left.cmp(right)
56 }
57}
58
59pub struct NoCaseCollation;
64
65impl CollationFunction for NoCaseCollation {
66 fn name(&self) -> &str {
67 "NOCASE"
68 }
69
70 fn compare(&self, left: &[u8], right: &[u8]) -> Ordering {
71 let l = left.iter().map(u8::to_ascii_uppercase);
72 let r = right.iter().map(u8::to_ascii_uppercase);
73 l.cmp(r)
74 }
75}
76
77pub struct RtrimCollation;
82
83impl CollationFunction for RtrimCollation {
84 fn name(&self) -> &str {
85 "RTRIM"
86 }
87
88 fn compare(&self, left: &[u8], right: &[u8]) -> Ordering {
89 let l = strip_trailing_spaces(left);
90 let r = strip_trailing_spaces(right);
91 l.cmp(r)
92 }
93}
94
95fn strip_trailing_spaces(s: &[u8]) -> &[u8] {
96 let mut end = s.len();
97 while end > 0 && s[end - 1] == b' ' {
98 end -= 1;
99 }
100 &s[..end]
101}
102
103fn builtin_collation(name: &str) -> Option<Arc<dyn CollationFunction>> {
104 type BuiltinCollations = (
105 Arc<dyn CollationFunction>,
106 Arc<dyn CollationFunction>,
107 Arc<dyn CollationFunction>,
108 );
109
110 static BUILTINS: OnceLock<BuiltinCollations> = OnceLock::new();
111 let (binary, nocase, rtrim) = BUILTINS.get_or_init(|| {
112 (
113 Arc::new(BinaryCollation) as Arc<dyn CollationFunction>,
114 Arc::new(NoCaseCollation) as Arc<dyn CollationFunction>,
115 Arc::new(RtrimCollation) as Arc<dyn CollationFunction>,
116 )
117 });
118 match name {
119 "BINARY" => Some(Arc::clone(binary)),
120 "NOCASE" => Some(Arc::clone(nocase)),
121 "RTRIM" => Some(Arc::clone(rtrim)),
122 _ => None,
123 }
124}
125
126#[derive(Clone)]
133pub struct CollationRegistry {
134 custom_collations: HashMap<String, Arc<dyn CollationFunction>>,
135}
136
137impl std::fmt::Debug for CollationRegistry {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.debug_struct("CollationRegistry")
140 .field("collations", &self.names())
141 .finish()
142 }
143}
144
145impl Default for CollationRegistry {
146 fn default() -> Self {
147 Self::new()
148 }
149}
150
151impl CollationRegistry {
152 #[must_use]
154 pub fn new() -> Self {
155 Self {
156 custom_collations: HashMap::new(),
157 }
158 }
159
160 pub fn register<C: CollationFunction + 'static>(
165 &mut self,
166 collation: C,
167 ) -> Option<Arc<dyn CollationFunction>> {
168 let name = collation.name().to_ascii_uppercase();
169 info!(collation_name = %name, deterministic = true, "custom collation registration");
170 self.custom_collations
171 .insert(name.clone(), Arc::new(collation))
172 .or_else(|| builtin_collation(&name))
173 }
174
175 #[must_use]
179 pub fn find(&self, name: &str) -> Option<Arc<dyn CollationFunction>> {
180 let canon = name.to_ascii_uppercase();
181 let result = self
182 .custom_collations
183 .get(&canon)
184 .cloned()
185 .or_else(|| builtin_collation(&canon));
186 debug!(
187 collation = %canon,
188 hit = result.is_some(),
189 "collation registry lookup"
190 );
191 result
192 }
193
194 #[must_use]
196 pub fn contains(&self, name: &str) -> bool {
197 let canon = name.to_ascii_uppercase();
198 self.custom_collations.contains_key(&canon) || builtin_collation(&canon).is_some()
199 }
200
201 #[must_use]
207 pub fn names(&self) -> Vec<String> {
208 let mut names = vec!["BINARY".to_owned(), "NOCASE".to_owned(), "RTRIM".to_owned()];
209 let mut custom: Vec<String> = self
210 .custom_collations
211 .keys()
212 .filter(|name| !matches!(name.as_str(), "BINARY" | "NOCASE" | "RTRIM"))
213 .cloned()
214 .collect();
215 custom.sort_unstable_by_key(|name| name.to_ascii_uppercase());
216 names.extend(custom);
217 names
218 }
219}
220
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
228pub enum CollationSource {
229 Explicit,
231 Schema,
233 Default,
235}
236
237#[derive(Debug, Clone)]
239pub struct CollationAnnotation {
240 pub name: String,
242 pub source: CollationSource,
244}
245
246#[must_use]
256pub fn resolve_collation(lhs: &CollationAnnotation, rhs: &CollationAnnotation) -> String {
257 let result = match (lhs.source, rhs.source) {
259 (_, CollationSource::Explicit) if lhs.source != CollationSource::Explicit => &rhs.name,
260 (CollationSource::Default, CollationSource::Schema) => &rhs.name,
261 _ => &lhs.name,
262 };
263 debug!(
264 collation = %result,
265 lhs_source = ?lhs.source,
266 rhs_source = ?rhs.source,
267 context = "COMPARE",
268 "collation selection"
269 );
270 result.clone()
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276
277 #[test]
280 fn test_collation_binary_memcmp() {
281 let coll = BinaryCollation;
282 assert_eq!(coll.compare(b"abc", b"abc"), Ordering::Equal);
283 assert_eq!(coll.compare(b"abc", b"abd"), Ordering::Less);
284 assert_eq!(coll.compare(b"abd", b"abc"), Ordering::Greater);
285 assert_eq!(coll.compare(b"ABC", b"abc"), Ordering::Less);
287 assert_eq!(
289 coll.compare("café".as_bytes(), "café".as_bytes()),
290 Ordering::Equal
291 );
292 assert_ne!(coll.compare("über".as_bytes(), b"uber"), Ordering::Equal);
293 }
294
295 #[test]
296 fn test_collation_binary_basic() {
297 let coll = BinaryCollation;
298 assert_eq!(coll.compare(b"ABC", b"abc"), Ordering::Less);
300 assert_eq!(coll.compare(b"\x00", b"\x01"), Ordering::Less);
302 assert_eq!(coll.compare(b"\xff", b"\x00"), Ordering::Greater);
303 }
304
305 #[test]
306 fn test_collation_nocase_ascii() {
307 let coll = NoCaseCollation;
308 assert_eq!(coll.compare(b"ABC", b"abc"), Ordering::Equal);
309 assert_eq!(coll.compare(b"Alice", b"alice"), Ordering::Equal);
310 assert_eq!(coll.compare(b"[", b"a"), Ordering::Greater);
312 }
313
314 #[test]
315 fn test_collation_nocase_ascii_only() {
316 let coll = NoCaseCollation;
317 assert_ne!(
319 coll.compare("Ä".as_bytes(), "ä".as_bytes()),
320 Ordering::Equal,
321 "NOCASE must NOT fold non-ASCII"
322 );
323 assert_eq!(coll.compare(b"Z", b"z"), Ordering::Equal);
325 assert_eq!(coll.compare(b"[", b"["), Ordering::Equal);
326 assert_ne!(coll.compare(b"[", b"{"), Ordering::Equal);
328 }
329
330 #[test]
331 fn test_collation_rtrim() {
332 let coll = RtrimCollation;
333 assert_eq!(coll.compare(b"hello ", b"hello"), Ordering::Equal);
335 assert_eq!(coll.compare(b"hello", b"hello "), Ordering::Equal);
336 assert_eq!(coll.compare(b"hello ", b"hello "), Ordering::Equal);
337 assert_ne!(coll.compare(b"hello!", b"hello"), Ordering::Equal);
339 assert_ne!(coll.compare(b"hello ", b"hello!"), Ordering::Equal);
341 }
342
343 #[test]
344 fn test_collation_rtrim_tabs_not_stripped() {
345 let coll = RtrimCollation;
346 assert_ne!(
348 coll.compare(b"hello\t", b"hello"),
349 Ordering::Equal,
350 "RTRIM must NOT strip tabs"
351 );
352 assert_ne!(
354 coll.compare(b"hello\xc2\xa0", b"hello"),
355 Ordering::Equal,
356 "RTRIM must NOT strip non-breaking spaces"
357 );
358 }
359
360 #[test]
361 fn test_collation_properties_antisymmetric() {
362 let collations: Vec<Box<dyn CollationFunction>> = vec![
363 Box::new(BinaryCollation),
364 Box::new(NoCaseCollation),
365 Box::new(RtrimCollation),
366 ];
367
368 let pairs: &[(&[u8], &[u8])] = &[
369 (b"abc", b"def"),
370 (b"hello", b"world"),
371 (b"ABC", b"abc"),
372 (b"hello ", b"hello"),
373 ];
374
375 for coll in &collations {
376 for &(a, b) in pairs {
377 let forward = coll.compare(a, b);
378 let reverse = coll.compare(b, a);
379 assert_eq!(
380 forward,
381 reverse.reverse(),
382 "{}: compare({:?}, {:?}) = {forward:?}, but reverse = {reverse:?}",
383 coll.name(),
384 std::str::from_utf8(a).unwrap_or("?"),
385 std::str::from_utf8(b).unwrap_or("?"),
386 );
387 }
388 }
389 }
390
391 #[test]
392 fn test_collation_properties_transitive() {
393 let coll = BinaryCollation;
394 let a = b"apple";
395 let b = b"banana";
396 let c = b"cherry";
397
398 assert_eq!(coll.compare(a, b), Ordering::Less);
400 assert_eq!(coll.compare(b, c), Ordering::Less);
401 assert_eq!(coll.compare(a, c), Ordering::Less);
402 }
403
404 #[test]
405 fn test_collation_send_sync() {
406 fn assert_send_sync<T: Send + Sync>() {}
407 assert_send_sync::<BinaryCollation>();
408 assert_send_sync::<NoCaseCollation>();
409 assert_send_sync::<RtrimCollation>();
410 }
411
412 #[test]
415 fn test_registry_preloaded_builtins() {
416 let reg = CollationRegistry::new();
417 assert!(reg.contains("BINARY"));
418 assert!(reg.contains("NOCASE"));
419 assert!(reg.contains("RTRIM"));
420
421 let binary = reg.find("BINARY").expect("BINARY must be pre-registered");
422 assert_eq!(binary.compare(b"a", b"b"), Ordering::Less);
423
424 let nocase = reg.find("NOCASE").expect("NOCASE must be pre-registered");
425 assert_eq!(nocase.compare(b"ABC", b"abc"), Ordering::Equal);
426
427 let rtrim = reg.find("RTRIM").expect("RTRIM must be pre-registered");
428 assert_eq!(rtrim.compare(b"x ", b"x"), Ordering::Equal);
429 }
430
431 struct ReverseCollation;
432
433 impl CollationFunction for ReverseCollation {
434 fn name(&self) -> &str {
435 "REVERSE"
436 }
437
438 fn compare(&self, left: &[u8], right: &[u8]) -> Ordering {
439 right.cmp(left)
440 }
441 }
442
443 #[test]
444 fn test_registry_custom_collation_registration() {
445 let mut reg = CollationRegistry::new();
446
447 let prev = reg.register(ReverseCollation);
448 assert!(prev.is_none(), "no prior REVERSE collation");
449 assert!(reg.contains("REVERSE"));
450
451 let coll = reg.find("reverse").expect("case-insensitive lookup");
452 assert_eq!(coll.compare(b"a", b"z"), Ordering::Greater);
453 }
454
455 struct AlwaysEqualCollation;
456
457 impl CollationFunction for AlwaysEqualCollation {
458 fn name(&self) -> &str {
459 "BINARY"
460 }
461
462 fn compare(&self, _left: &[u8], _right: &[u8]) -> Ordering {
463 Ordering::Equal
464 }
465 }
466
467 #[test]
468 fn test_registry_overwrite_builtin() {
469 let mut reg = CollationRegistry::new();
470
471 let prev = reg.register(AlwaysEqualCollation);
472 assert!(prev.is_some(), "should return previous BINARY collation");
473
474 let coll = reg.find("BINARY").unwrap();
475 assert_eq!(
476 coll.compare(b"a", b"z"),
477 Ordering::Equal,
478 "custom overwrite must take effect"
479 );
480 }
481
482 #[test]
483 fn test_registry_unregistered_returns_none() {
484 let reg = CollationRegistry::new();
485 assert!(reg.find("NONEXISTENT").is_none());
486 assert!(!reg.contains("NONEXISTENT"));
487 }
488
489 #[test]
490 fn test_registry_name_case_insensitive() {
491 let reg = CollationRegistry::new();
492 assert!(reg.find("BINARY").is_some());
494 assert!(reg.find("binary").is_some());
495 assert!(reg.find("Binary").is_some());
496 assert!(reg.find("bInArY").is_some());
497
498 assert!(reg.contains("nocase"));
500 assert!(reg.contains("NOCASE"));
501 assert!(reg.contains("NoCase"));
502 }
503
504 fn ann(name: &str, source: CollationSource) -> CollationAnnotation {
507 CollationAnnotation {
508 name: name.to_owned(),
509 source,
510 }
511 }
512
513 #[test]
514 fn test_collation_selection_explicit_wins() {
515 let result = resolve_collation(
517 &ann("NOCASE", CollationSource::Explicit),
518 &ann("BINARY", CollationSource::Default),
519 );
520 assert_eq!(result, "NOCASE");
521 }
522
523 #[test]
524 fn test_collation_selection_explicit_rhs_wins_over_default() {
525 let result = resolve_collation(
526 &ann("BINARY", CollationSource::Default),
527 &ann("RTRIM", CollationSource::Explicit),
528 );
529 assert_eq!(result, "RTRIM");
530 }
531
532 #[test]
533 fn test_collation_selection_leftmost_explicit_wins() {
534 let result = resolve_collation(
536 &ann("NOCASE", CollationSource::Explicit),
537 &ann("RTRIM", CollationSource::Explicit),
538 );
539 assert_eq!(result, "NOCASE");
540 }
541
542 #[test]
543 fn test_collation_selection_schema_over_default() {
544 let result = resolve_collation(
545 &ann("NOCASE", CollationSource::Schema),
546 &ann("BINARY", CollationSource::Default),
547 );
548 assert_eq!(result, "NOCASE");
549 }
550
551 #[test]
552 fn test_collation_selection_schema_rhs_over_default() {
553 let result = resolve_collation(
554 &ann("BINARY", CollationSource::Default),
555 &ann("NOCASE", CollationSource::Schema),
556 );
557 assert_eq!(result, "NOCASE");
558 }
559
560 #[test]
561 fn test_collation_selection_explicit_over_schema() {
562 let result = resolve_collation(
563 &ann("RTRIM", CollationSource::Explicit),
564 &ann("NOCASE", CollationSource::Schema),
565 );
566 assert_eq!(result, "RTRIM");
567 }
568
569 #[test]
570 fn test_collation_selection_default_binary() {
571 let result = resolve_collation(
572 &ann("BINARY", CollationSource::Default),
573 &ann("BINARY", CollationSource::Default),
574 );
575 assert_eq!(result, "BINARY");
576 }
577
578 #[test]
581 fn test_min_respects_collation() {
582 let binary = BinaryCollation;
584 let binary_min = if binary.compare(b"ABC", b"abc") == Ordering::Less {
585 "ABC"
586 } else {
587 "abc"
588 };
589 assert_eq!(binary_min, "ABC");
590
591 let nocase = NoCaseCollation;
593 assert_eq!(nocase.compare(b"ABC", b"abc"), Ordering::Equal);
594 }
595
596 #[test]
597 fn test_max_respects_collation() {
598 let binary = BinaryCollation;
599 let binary_max = if binary.compare(b"abc", b"ABC") == Ordering::Greater {
601 "abc"
602 } else {
603 "ABC"
604 };
605 assert_eq!(binary_max, "abc");
606 }
607
608 #[test]
609 fn test_collation_aware_sort() {
610 let nocase = NoCaseCollation;
612 let mut data: Vec<&[u8]> = vec![b"Banana", b"apple", b"Cherry", b"date"];
613 data.sort_by(|a, b| nocase.compare(a, b));
614
615 assert_eq!(data[0], b"apple");
617 assert_eq!(data[1], b"Banana");
618 assert_eq!(data[2], b"Cherry");
619 assert_eq!(data[3], b"date");
620 }
621
622 #[test]
623 fn test_collation_aware_group_by() {
624 let nocase = NoCaseCollation;
626 let items: Vec<&[u8]> = vec![b"ABC", b"abc", b"Abc", b"def", b"DEF"];
627 let mut groups: Vec<Vec<&[u8]>> = Vec::new();
628
629 let mut sorted = items;
631 sorted.sort_by(|a, b| nocase.compare(a, b));
632
633 let mut current_group: Vec<&[u8]> = vec![sorted[0]];
634 for window in sorted.windows(2) {
635 if nocase.compare(window[0], window[1]) != Ordering::Equal {
636 groups.push(std::mem::take(&mut current_group));
637 }
638 current_group.push(window[1]);
639 }
640 groups.push(current_group);
641
642 assert_eq!(groups.len(), 2);
644 assert_eq!(groups[0].len(), 3);
645 assert_eq!(groups[1].len(), 2);
646 }
647
648 #[test]
649 fn test_collation_aware_distinct() {
650 let nocase = NoCaseCollation;
652 let items: Vec<&[u8]> = vec![b"ABC", b"abc", b"Abc", b"def", b"DEF"];
653
654 let mut distinct: Vec<&[u8]> = Vec::new();
655 for item in &items {
656 let already = distinct
657 .iter()
658 .any(|d| nocase.compare(d, item) == Ordering::Equal);
659 if !already {
660 distinct.push(item);
661 }
662 }
663
664 assert_eq!(distinct.len(), 2);
666 }
667
668 #[test]
669 fn test_registry_default_impl() {
670 let reg = CollationRegistry::default();
672 assert!(reg.contains("BINARY"));
673 assert!(reg.contains("NOCASE"));
674 assert!(reg.contains("RTRIM"));
675 }
676
677 #[test]
678 fn test_collation_annotation_debug() {
679 let ann = CollationAnnotation {
680 name: "NOCASE".to_owned(),
681 source: CollationSource::Explicit,
682 };
683 let debug_str = format!("{ann:?}");
684 assert!(debug_str.contains("NOCASE"));
685 assert!(debug_str.contains("Explicit"));
686 }
687
688 #[test]
689 fn test_collation_source_equality() {
690 assert_eq!(CollationSource::Explicit, CollationSource::Explicit);
691 assert_ne!(CollationSource::Explicit, CollationSource::Schema);
692 assert_ne!(CollationSource::Schema, CollationSource::Default);
693 }
694}