1use std::cmp::Ordering;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use tower::Service;
8
9use camel_api::{CamelError, Exchange};
10
11#[derive(Debug, Clone)]
15pub struct SortKey(pub serde_json::Value);
16
17impl Ord for SortKey {
18 fn cmp(&self, other: &Self) -> Ordering {
19 use serde_json::Value::*;
20 match (&self.0, &other.0) {
21 (Null, Null) => Ordering::Equal,
22 (Null, _) => Ordering::Less,
23 (_, Null) => Ordering::Greater,
24 (Bool(a), Bool(b)) => a.cmp(b),
25 (Bool(_), Number(_)) | (Bool(_), String(_)) => Ordering::Less,
26 (Number(_), Bool(_)) | (String(_), Bool(_)) => Ordering::Greater,
27 (Number(a), Number(b)) => {
30 let af = a.as_f64().unwrap_or(f64::INFINITY);
31 let bf = b.as_f64().unwrap_or(f64::INFINITY);
32 af.partial_cmp(&bf)
33 .unwrap_or_else(|| af.is_nan().cmp(&bf.is_nan()))
34 }
35 (Number(_), String(_)) => Ordering::Less,
36 (String(_), Number(_)) => Ordering::Greater,
37 (String(a), String(b)) => a.cmp(b),
38 _ => Ordering::Equal, }
40 }
41}
42
43impl PartialOrd for SortKey {
44 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
45 Some(self.cmp(other))
46 }
47}
48
49impl PartialEq for SortKey {
50 fn eq(&self, other: &Self) -> bool {
51 self.cmp(other) == Ordering::Equal
52 }
53}
54
55impl Eq for SortKey {}
56
57pub type SortExpression =
60 Arc<dyn Fn(&serde_json::Value) -> Result<SortKey, CamelError> + Send + Sync>;
61
62#[derive(Clone)]
67pub struct SortService {
68 expression: SortExpression,
69 reverse: bool,
70}
71
72impl SortService {
73 pub fn new(expression: SortExpression, reverse: bool) -> Self {
74 Self {
75 expression,
76 reverse,
77 }
78 }
79}
80
81impl Service<Exchange> for SortService {
82 type Response = Exchange;
83 type Error = CamelError;
84 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
85
86 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87 Poll::Ready(Ok(()))
88 }
89
90 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
91 let expression = Arc::clone(&self.expression);
92 let reverse = self.reverse;
93 Box::pin(async move {
94 let array = match std::mem::take(&mut exchange.input.body) {
95 camel_api::body::Body::Json(serde_json::Value::Array(arr)) => arr,
96 _ => {
97 return Err(CamelError::ProcessorError(
98 "sort requires an array body (Body::Json(Value::Array))".into(),
99 ));
100 }
101 };
102
103 let mut indexed: Vec<(SortKey, serde_json::Value)> = Vec::with_capacity(array.len());
104 for element in array {
105 let key = expression(&element)?;
106 indexed.push((key, element));
107 }
108
109 if reverse {
110 indexed.sort_by(|a, b| b.0.cmp(&a.0));
111 } else {
112 indexed.sort_by(|a, b| a.0.cmp(&b.0));
113 }
114
115 let sorted: Vec<serde_json::Value> = indexed.into_iter().map(|(_, v)| v).collect();
116 exchange.input.body = camel_api::body::Body::Json(serde_json::Value::Array(sorted));
117 Ok(exchange)
118 })
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use camel_api::{Exchange, Message, body::Body};
126 use serde_json::json;
127 use tower::ServiceExt;
128
129 #[tokio::test]
130 async fn ascending_numeric_sort() {
131 let exchange = Exchange::new(Message::new(Body::Json(json!([3, 1, 2]))));
132 let expr: SortExpression = Arc::new(|v| Ok(SortKey(v.clone())));
133 let svc = SortService::new(expr, false);
134 let result = svc.oneshot(exchange).await.unwrap();
135 assert_eq!(result.input.body, Body::Json(json!([1, 2, 3])));
136 }
137
138 #[tokio::test]
139 async fn descending_numeric_sort() {
140 let exchange = Exchange::new(Message::new(Body::Json(json!([3, 1, 2]))));
141 let expr: SortExpression = Arc::new(|v| Ok(SortKey(v.clone())));
142 let svc = SortService::new(expr, true);
143 let result = svc.oneshot(exchange).await.unwrap();
144 assert_eq!(result.input.body, Body::Json(json!([3, 2, 1])));
145 }
146
147 #[tokio::test]
148 async fn string_sort() {
149 let exchange = Exchange::new(Message::new(Body::Json(json!([
150 "banana", "apple", "cherry"
151 ]))));
152 let expr: SortExpression = Arc::new(|v| Ok(SortKey(v.clone())));
153 let svc = SortService::new(expr, false);
154 let result = svc.oneshot(exchange).await.unwrap();
155 assert_eq!(
156 result.input.body,
157 Body::Json(json!(["apple", "banana", "cherry"]))
158 );
159 }
160
161 #[tokio::test]
162 async fn empty_array_passthrough() {
163 let exchange = Exchange::new(Message::new(Body::Json(json!([]))));
164 let expr: SortExpression = Arc::new(|v| Ok(SortKey(v.clone())));
165 let svc = SortService::new(expr, false);
166 let result = svc.oneshot(exchange).await.unwrap();
167 assert_eq!(result.input.body, Body::Json(json!([])));
168 }
169
170 #[tokio::test]
171 async fn non_array_body_errors() {
172 let exchange = Exchange::new(Message::new(Body::Text("hello".to_string())));
173 let expr: SortExpression = Arc::new(|v| Ok(SortKey(v.clone())));
174 let svc = SortService::new(expr, false);
175 let result = svc.oneshot(exchange).await;
176 assert!(matches!(result, Err(CamelError::ProcessorError(_))));
177 }
178
179 #[tokio::test]
180 async fn array_key_errors() {
181 let exchange = Exchange::new(Message::new(Body::Json(json!([[1, 2], 3]))));
182 let expr: SortExpression = Arc::new(|v| {
183 if v.is_array() {
184 Err(CamelError::ProcessorError("array key rejected".into()))
185 } else {
186 Ok(SortKey(v.clone()))
187 }
188 });
189 let svc = SortService::new(expr, false);
190 let result = svc.oneshot(exchange).await;
191 assert!(result.is_err());
192 }
193
194 #[tokio::test]
195 async fn mixed_type_sort_key_order() {
196 let exchange = Exchange::new(Message::new(Body::Json(json!([
198 "str", null, false, 42, true, 1
199 ]))));
200 let expr: SortExpression = Arc::new(|v| Ok(SortKey(v.clone())));
201 let svc = SortService::new(expr, false);
202 let result = svc.oneshot(exchange).await.unwrap();
203 assert_eq!(
205 result.input.body,
206 Body::Json(json!([null, false, true, 1, 42, "str"]))
207 );
208 }
209}