1#![allow(clippy::unnecessary_literal_bound)]
18
19use std::cmp::Ordering;
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use tracing::{debug, info};
24
25pub trait CollationFunction: Send + Sync {
32 fn name(&self) -> &str;
34
35 fn compare(&self, left: &[u8], right: &[u8]) -> Ordering;
39}
40
41pub struct BinaryCollation;
48
49impl CollationFunction for BinaryCollation {
50 fn name(&self) -> &str {
51 "BINARY"
52 }
53
54 fn compare(&self, left: &[u8], right: &[u8]) -> Ordering {
55 left.cmp(right)
56 }
57}
58
59pub struct NoCaseCollation;
64
65impl CollationFunction for NoCaseCollation {
66 fn name(&self) -> &str {
67 "NOCASE"
68 }
69
70 fn compare(&self, left: &[u8], right: &[u8]) -> Ordering {
71 let l = left.iter().map(u8::to_ascii_lowercase);
72 let r = right.iter().map(u8::to_ascii_lowercase);
73 l.cmp(r)
74 }
75}
76
77pub struct RtrimCollation;
82
83impl CollationFunction for RtrimCollation {
84 fn name(&self) -> &str {
85 "RTRIM"
86 }
87
88 fn compare(&self, left: &[u8], right: &[u8]) -> Ordering {
89 let l = strip_trailing_spaces(left);
90 let r = strip_trailing_spaces(right);
91 l.cmp(r)
92 }
93}
94
95fn strip_trailing_spaces(s: &[u8]) -> &[u8] {
96 let mut end = s.len();
97 while end > 0 && s[end - 1] == b' ' {
98 end -= 1;
99 }
100 &s[..end]
101}
102
103pub struct CollationRegistry {
110 collations: HashMap<String, Arc<dyn CollationFunction>>,
111}
112
113impl Default for CollationRegistry {
114 fn default() -> Self {
115 Self::new()
116 }
117}
118
119impl CollationRegistry {
120 #[must_use]
122 pub fn new() -> Self {
123 let mut collations = HashMap::with_capacity(3);
124 collations.insert(
125 "BINARY".to_owned(),
126 Arc::new(BinaryCollation) as Arc<dyn CollationFunction>,
127 );
128 collations.insert(
129 "NOCASE".to_owned(),
130 Arc::new(NoCaseCollation) as Arc<dyn CollationFunction>,
131 );
132 collations.insert(
133 "RTRIM".to_owned(),
134 Arc::new(RtrimCollation) as Arc<dyn CollationFunction>,
135 );
136 Self { collations }
137 }
138
139 pub fn register<C: CollationFunction + 'static>(
144 &mut self,
145 collation: C,
146 ) -> Option<Arc<dyn CollationFunction>> {
147 let name = collation.name().to_ascii_uppercase();
148 info!(collation_name = %name, deterministic = true, "custom collation registration");
149 self.collations.insert(name, Arc::new(collation))
150 }
151
152 #[must_use]
156 pub fn find(&self, name: &str) -> Option<Arc<dyn CollationFunction>> {
157 let canon = name.to_ascii_uppercase();
158 let result = self.collations.get(&canon).cloned();
159 debug!(
160 collation = %canon,
161 hit = result.is_some(),
162 "collation registry lookup"
163 );
164 result
165 }
166
167 #[must_use]
169 pub fn contains(&self, name: &str) -> bool {
170 self.collations.contains_key(&name.to_ascii_uppercase())
171 }
172}
173
174#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181pub enum CollationSource {
182 Explicit,
184 Schema,
186 Default,
188}
189
190#[derive(Debug, Clone)]
192pub struct CollationAnnotation {
193 pub name: String,
195 pub source: CollationSource,
197}
198
199#[must_use]
209pub fn resolve_collation(lhs: &CollationAnnotation, rhs: &CollationAnnotation) -> String {
210 let result = match (lhs.source, rhs.source) {
212 (_, CollationSource::Explicit) if lhs.source != CollationSource::Explicit => &rhs.name,
213 (CollationSource::Default, CollationSource::Schema) => &rhs.name,
214 _ => &lhs.name,
215 };
216 debug!(
217 collation = %result,
218 lhs_source = ?lhs.source,
219 rhs_source = ?rhs.source,
220 context = "COMPARE",
221 "collation selection"
222 );
223 result.clone()
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 #[test]
233 fn test_collation_binary_memcmp() {
234 let coll = BinaryCollation;
235 assert_eq!(coll.compare(b"abc", b"abc"), Ordering::Equal);
236 assert_eq!(coll.compare(b"abc", b"abd"), Ordering::Less);
237 assert_eq!(coll.compare(b"abd", b"abc"), Ordering::Greater);
238 assert_eq!(coll.compare(b"ABC", b"abc"), Ordering::Less);
240 assert_eq!(
242 coll.compare("café".as_bytes(), "café".as_bytes()),
243 Ordering::Equal
244 );
245 assert_ne!(coll.compare("über".as_bytes(), b"uber"), Ordering::Equal);
246 }
247
248 #[test]
249 fn test_collation_binary_basic() {
250 let coll = BinaryCollation;
251 assert_eq!(coll.compare(b"ABC", b"abc"), Ordering::Less);
253 assert_eq!(coll.compare(b"\x00", b"\x01"), Ordering::Less);
255 assert_eq!(coll.compare(b"\xff", b"\x00"), Ordering::Greater);
256 }
257
258 #[test]
259 fn test_collation_nocase_ascii() {
260 let coll = NoCaseCollation;
261 assert_eq!(coll.compare(b"ABC", b"abc"), Ordering::Equal);
262 assert_eq!(coll.compare(b"Alice", b"alice"), Ordering::Equal);
263 assert_eq!(coll.compare(b"A", b"b"), Ordering::Less);
265 }
266
267 #[test]
268 fn test_collation_nocase_ascii_only() {
269 let coll = NoCaseCollation;
270 assert_ne!(
272 coll.compare("Ä".as_bytes(), "ä".as_bytes()),
273 Ordering::Equal,
274 "NOCASE must NOT fold non-ASCII"
275 );
276 assert_eq!(coll.compare(b"Z", b"z"), Ordering::Equal);
278 assert_eq!(coll.compare(b"[", b"["), Ordering::Equal);
279 assert_ne!(coll.compare(b"[", b"{"), Ordering::Equal);
281 }
282
283 #[test]
284 fn test_collation_rtrim() {
285 let coll = RtrimCollation;
286 assert_eq!(coll.compare(b"hello ", b"hello"), Ordering::Equal);
288 assert_eq!(coll.compare(b"hello", b"hello "), Ordering::Equal);
289 assert_eq!(coll.compare(b"hello ", b"hello "), Ordering::Equal);
290 assert_ne!(coll.compare(b"hello!", b"hello"), Ordering::Equal);
292 assert_ne!(coll.compare(b"hello ", b"hello!"), Ordering::Equal);
294 }
295
296 #[test]
297 fn test_collation_rtrim_tabs_not_stripped() {
298 let coll = RtrimCollation;
299 assert_ne!(
301 coll.compare(b"hello\t", b"hello"),
302 Ordering::Equal,
303 "RTRIM must NOT strip tabs"
304 );
305 assert_ne!(
307 coll.compare(b"hello\xc2\xa0", b"hello"),
308 Ordering::Equal,
309 "RTRIM must NOT strip non-breaking spaces"
310 );
311 }
312
313 #[test]
314 fn test_collation_properties_antisymmetric() {
315 let collations: Vec<Box<dyn CollationFunction>> = vec![
316 Box::new(BinaryCollation),
317 Box::new(NoCaseCollation),
318 Box::new(RtrimCollation),
319 ];
320
321 let pairs: &[(&[u8], &[u8])] = &[
322 (b"abc", b"def"),
323 (b"hello", b"world"),
324 (b"ABC", b"abc"),
325 (b"hello ", b"hello"),
326 ];
327
328 for coll in &collations {
329 for &(a, b) in pairs {
330 let forward = coll.compare(a, b);
331 let reverse = coll.compare(b, a);
332 assert_eq!(
333 forward,
334 reverse.reverse(),
335 "{}: compare({:?}, {:?}) = {forward:?}, but reverse = {reverse:?}",
336 coll.name(),
337 std::str::from_utf8(a).unwrap_or("?"),
338 std::str::from_utf8(b).unwrap_or("?"),
339 );
340 }
341 }
342 }
343
344 #[test]
345 fn test_collation_properties_transitive() {
346 let coll = BinaryCollation;
347 let a = b"apple";
348 let b = b"banana";
349 let c = b"cherry";
350
351 assert_eq!(coll.compare(a, b), Ordering::Less);
353 assert_eq!(coll.compare(b, c), Ordering::Less);
354 assert_eq!(coll.compare(a, c), Ordering::Less);
355 }
356
357 #[test]
358 fn test_collation_send_sync() {
359 fn assert_send_sync<T: Send + Sync>() {}
360 assert_send_sync::<BinaryCollation>();
361 assert_send_sync::<NoCaseCollation>();
362 assert_send_sync::<RtrimCollation>();
363 }
364
365 #[test]
368 fn test_registry_preloaded_builtins() {
369 let reg = CollationRegistry::new();
370 assert!(reg.contains("BINARY"));
371 assert!(reg.contains("NOCASE"));
372 assert!(reg.contains("RTRIM"));
373
374 let binary = reg.find("BINARY").expect("BINARY must be pre-registered");
375 assert_eq!(binary.compare(b"a", b"b"), Ordering::Less);
376
377 let nocase = reg.find("NOCASE").expect("NOCASE must be pre-registered");
378 assert_eq!(nocase.compare(b"ABC", b"abc"), Ordering::Equal);
379
380 let rtrim = reg.find("RTRIM").expect("RTRIM must be pre-registered");
381 assert_eq!(rtrim.compare(b"x ", b"x"), Ordering::Equal);
382 }
383
384 struct ReverseCollation;
385
386 impl CollationFunction for ReverseCollation {
387 fn name(&self) -> &str {
388 "REVERSE"
389 }
390
391 fn compare(&self, left: &[u8], right: &[u8]) -> Ordering {
392 right.cmp(left)
393 }
394 }
395
396 #[test]
397 fn test_registry_custom_collation_registration() {
398 let mut reg = CollationRegistry::new();
399
400 let prev = reg.register(ReverseCollation);
401 assert!(prev.is_none(), "no prior REVERSE collation");
402 assert!(reg.contains("REVERSE"));
403
404 let coll = reg.find("reverse").expect("case-insensitive lookup");
405 assert_eq!(coll.compare(b"a", b"z"), Ordering::Greater);
406 }
407
408 struct AlwaysEqualCollation;
409
410 impl CollationFunction for AlwaysEqualCollation {
411 fn name(&self) -> &str {
412 "BINARY"
413 }
414
415 fn compare(&self, _left: &[u8], _right: &[u8]) -> Ordering {
416 Ordering::Equal
417 }
418 }
419
420 #[test]
421 fn test_registry_overwrite_builtin() {
422 let mut reg = CollationRegistry::new();
423
424 let prev = reg.register(AlwaysEqualCollation);
425 assert!(prev.is_some(), "should return previous BINARY collation");
426
427 let coll = reg.find("BINARY").unwrap();
428 assert_eq!(
429 coll.compare(b"a", b"z"),
430 Ordering::Equal,
431 "custom overwrite must take effect"
432 );
433 }
434
435 #[test]
436 fn test_registry_unregistered_returns_none() {
437 let reg = CollationRegistry::new();
438 assert!(reg.find("NONEXISTENT").is_none());
439 assert!(!reg.contains("NONEXISTENT"));
440 }
441
442 #[test]
443 fn test_registry_name_case_insensitive() {
444 let reg = CollationRegistry::new();
445 assert!(reg.find("BINARY").is_some());
447 assert!(reg.find("binary").is_some());
448 assert!(reg.find("Binary").is_some());
449 assert!(reg.find("bInArY").is_some());
450
451 assert!(reg.contains("nocase"));
453 assert!(reg.contains("NOCASE"));
454 assert!(reg.contains("NoCase"));
455 }
456
457 fn ann(name: &str, source: CollationSource) -> CollationAnnotation {
460 CollationAnnotation {
461 name: name.to_owned(),
462 source,
463 }
464 }
465
466 #[test]
467 fn test_collation_selection_explicit_wins() {
468 let result = resolve_collation(
470 &ann("NOCASE", CollationSource::Explicit),
471 &ann("BINARY", CollationSource::Default),
472 );
473 assert_eq!(result, "NOCASE");
474 }
475
476 #[test]
477 fn test_collation_selection_explicit_rhs_wins_over_default() {
478 let result = resolve_collation(
479 &ann("BINARY", CollationSource::Default),
480 &ann("RTRIM", CollationSource::Explicit),
481 );
482 assert_eq!(result, "RTRIM");
483 }
484
485 #[test]
486 fn test_collation_selection_leftmost_explicit_wins() {
487 let result = resolve_collation(
489 &ann("NOCASE", CollationSource::Explicit),
490 &ann("RTRIM", CollationSource::Explicit),
491 );
492 assert_eq!(result, "NOCASE");
493 }
494
495 #[test]
496 fn test_collation_selection_schema_over_default() {
497 let result = resolve_collation(
498 &ann("NOCASE", CollationSource::Schema),
499 &ann("BINARY", CollationSource::Default),
500 );
501 assert_eq!(result, "NOCASE");
502 }
503
504 #[test]
505 fn test_collation_selection_schema_rhs_over_default() {
506 let result = resolve_collation(
507 &ann("BINARY", CollationSource::Default),
508 &ann("NOCASE", CollationSource::Schema),
509 );
510 assert_eq!(result, "NOCASE");
511 }
512
513 #[test]
514 fn test_collation_selection_explicit_over_schema() {
515 let result = resolve_collation(
516 &ann("RTRIM", CollationSource::Explicit),
517 &ann("NOCASE", CollationSource::Schema),
518 );
519 assert_eq!(result, "RTRIM");
520 }
521
522 #[test]
523 fn test_collation_selection_default_binary() {
524 let result = resolve_collation(
525 &ann("BINARY", CollationSource::Default),
526 &ann("BINARY", CollationSource::Default),
527 );
528 assert_eq!(result, "BINARY");
529 }
530
531 #[test]
534 fn test_min_respects_collation() {
535 let binary = BinaryCollation;
537 let binary_min = if binary.compare(b"ABC", b"abc") == Ordering::Less {
538 "ABC"
539 } else {
540 "abc"
541 };
542 assert_eq!(binary_min, "ABC");
543
544 let nocase = NoCaseCollation;
546 assert_eq!(nocase.compare(b"ABC", b"abc"), Ordering::Equal);
547 }
548
549 #[test]
550 fn test_max_respects_collation() {
551 let binary = BinaryCollation;
552 let binary_max = if binary.compare(b"abc", b"ABC") == Ordering::Greater {
554 "abc"
555 } else {
556 "ABC"
557 };
558 assert_eq!(binary_max, "abc");
559 }
560
561 #[test]
562 fn test_collation_aware_sort() {
563 let nocase = NoCaseCollation;
565 let mut data: Vec<&[u8]> = vec![b"Banana", b"apple", b"Cherry", b"date"];
566 data.sort_by(|a, b| nocase.compare(a, b));
567
568 assert_eq!(data[0], b"apple");
570 assert_eq!(data[1], b"Banana");
571 assert_eq!(data[2], b"Cherry");
572 assert_eq!(data[3], b"date");
573 }
574
575 #[test]
576 fn test_collation_aware_group_by() {
577 let nocase = NoCaseCollation;
579 let items: Vec<&[u8]> = vec![b"ABC", b"abc", b"Abc", b"def", b"DEF"];
580 let mut groups: Vec<Vec<&[u8]>> = Vec::new();
581
582 let mut sorted = items;
584 sorted.sort_by(|a, b| nocase.compare(a, b));
585
586 let mut current_group: Vec<&[u8]> = vec![sorted[0]];
587 for window in sorted.windows(2) {
588 if nocase.compare(window[0], window[1]) != Ordering::Equal {
589 groups.push(std::mem::take(&mut current_group));
590 }
591 current_group.push(window[1]);
592 }
593 groups.push(current_group);
594
595 assert_eq!(groups.len(), 2);
597 assert_eq!(groups[0].len(), 3);
598 assert_eq!(groups[1].len(), 2);
599 }
600
601 #[test]
602 fn test_collation_aware_distinct() {
603 let nocase = NoCaseCollation;
605 let items: Vec<&[u8]> = vec![b"ABC", b"abc", b"Abc", b"def", b"DEF"];
606
607 let mut distinct: Vec<&[u8]> = Vec::new();
608 for item in &items {
609 let already = distinct
610 .iter()
611 .any(|d| nocase.compare(d, item) == Ordering::Equal);
612 if !already {
613 distinct.push(item);
614 }
615 }
616
617 assert_eq!(distinct.len(), 2);
619 }
620
621 #[test]
622 fn test_registry_default_impl() {
623 let reg = CollationRegistry::default();
625 assert!(reg.contains("BINARY"));
626 assert!(reg.contains("NOCASE"));
627 assert!(reg.contains("RTRIM"));
628 }
629
630 #[test]
631 fn test_collation_annotation_debug() {
632 let ann = CollationAnnotation {
633 name: "NOCASE".to_owned(),
634 source: CollationSource::Explicit,
635 };
636 let debug_str = format!("{ann:?}");
637 assert!(debug_str.contains("NOCASE"));
638 assert!(debug_str.contains("Explicit"));
639 }
640
641 #[test]
642 fn test_collation_source_equality() {
643 assert_eq!(CollationSource::Explicit, CollationSource::Explicit);
644 assert_ne!(CollationSource::Explicit, CollationSource::Schema);
645 assert_ne!(CollationSource::Schema, CollationSource::Default);
646 }
647}