Skip to main content

camel_processor/
sort.rs

1use std::cmp::Ordering;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use tower::Service;
8
9use camel_api::{CamelError, Exchange};
10
11/// Total order over a JSON value used as a sort key.
12/// Tier: Null < Bool < Number < String.
13/// Array/Object keys are REJECTED at extraction.
14#[derive(Debug, Clone)]
15pub struct SortKey(pub serde_json::Value);
16
17impl Ord for SortKey {
18    fn cmp(&self, other: &Self) -> Ordering {
19        use serde_json::Value::*;
20        match (&self.0, &other.0) {
21            (Null, Null) => Ordering::Equal,
22            (Null, _) => Ordering::Less,
23            (_, Null) => Ordering::Greater,
24            (Bool(a), Bool(b)) => a.cmp(b),
25            (Bool(_), Number(_)) | (Bool(_), String(_)) => Ordering::Less,
26            (Number(_), Bool(_)) | (String(_), Bool(_)) => Ordering::Greater,
27            // ponytail: serde_json::Number cannot represent NaN/Infinity; this branch
28            // is defensive and will never fire for valid JSON input.
29            (Number(a), Number(b)) => {
30                let af = a.as_f64().unwrap_or(f64::INFINITY);
31                let bf = b.as_f64().unwrap_or(f64::INFINITY);
32                af.partial_cmp(&bf)
33                    .unwrap_or_else(|| af.is_nan().cmp(&bf.is_nan()))
34            }
35            (Number(_), String(_)) => Ordering::Less,
36            (String(_), Number(_)) => Ordering::Greater,
37            (String(a), String(b)) => a.cmp(b),
38            _ => Ordering::Equal, // defensive fallback for Array/Object (shouldn't reach here)
39        }
40    }
41}
42
43impl PartialOrd for SortKey {
44    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
45        Some(self.cmp(other))
46    }
47}
48
49impl PartialEq for SortKey {
50    fn eq(&self, other: &Self) -> bool {
51        self.cmp(other) == Ordering::Equal
52    }
53}
54
55impl Eq for SortKey {}
56
57/// Extracts a sort key from each element of the body array.
58/// MUST return a scalar (Null/Bool/Number/String); returning Array/Object is a user error.
59pub type SortExpression =
60    Arc<dyn Fn(&serde_json::Value) -> Result<SortKey, CamelError> + Send + Sync>;
61
62/// SortService: order body collection by expression.
63///
64/// Process-mode leaf processor (no child pipeline). Requires `Body::Json(Value::Array(_))`.
65/// Non-array/non-Json body → Err. Array/Object keys → Err.
66#[derive(Clone)]
67pub struct SortService {
68    expression: SortExpression,
69    reverse: bool,
70}
71
72impl SortService {
73    pub fn new(expression: SortExpression, reverse: bool) -> Self {
74        Self {
75            expression,
76            reverse,
77        }
78    }
79}
80
81impl Service<Exchange> for SortService {
82    type Response = Exchange;
83    type Error = CamelError;
84    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
85
86    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87        Poll::Ready(Ok(()))
88    }
89
90    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
91        let expression = Arc::clone(&self.expression);
92        let reverse = self.reverse;
93        Box::pin(async move {
94            let array = match std::mem::take(&mut exchange.input.body) {
95                camel_api::body::Body::Json(serde_json::Value::Array(arr)) => arr,
96                _ => {
97                    return Err(CamelError::ProcessorError(
98                        "sort requires an array body (Body::Json(Value::Array))".into(),
99                    ));
100                }
101            };
102
103            let mut indexed: Vec<(SortKey, serde_json::Value)> = Vec::with_capacity(array.len());
104            for element in array {
105                let key = expression(&element)?;
106                indexed.push((key, element));
107            }
108
109            if reverse {
110                indexed.sort_by(|a, b| b.0.cmp(&a.0));
111            } else {
112                indexed.sort_by(|a, b| a.0.cmp(&b.0));
113            }
114
115            let sorted: Vec<serde_json::Value> = indexed.into_iter().map(|(_, v)| v).collect();
116            exchange.input.body = camel_api::body::Body::Json(serde_json::Value::Array(sorted));
117            Ok(exchange)
118        })
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use camel_api::{Exchange, Message, body::Body};
126    use serde_json::json;
127    use tower::ServiceExt;
128
129    #[tokio::test]
130    async fn ascending_numeric_sort() {
131        let exchange = Exchange::new(Message::new(Body::Json(json!([3, 1, 2]))));
132        let expr: SortExpression = Arc::new(|v| Ok(SortKey(v.clone())));
133        let svc = SortService::new(expr, false);
134        let result = svc.oneshot(exchange).await.unwrap();
135        assert_eq!(result.input.body, Body::Json(json!([1, 2, 3])));
136    }
137
138    #[tokio::test]
139    async fn descending_numeric_sort() {
140        let exchange = Exchange::new(Message::new(Body::Json(json!([3, 1, 2]))));
141        let expr: SortExpression = Arc::new(|v| Ok(SortKey(v.clone())));
142        let svc = SortService::new(expr, true);
143        let result = svc.oneshot(exchange).await.unwrap();
144        assert_eq!(result.input.body, Body::Json(json!([3, 2, 1])));
145    }
146
147    #[tokio::test]
148    async fn string_sort() {
149        let exchange = Exchange::new(Message::new(Body::Json(json!([
150            "banana", "apple", "cherry"
151        ]))));
152        let expr: SortExpression = Arc::new(|v| Ok(SortKey(v.clone())));
153        let svc = SortService::new(expr, false);
154        let result = svc.oneshot(exchange).await.unwrap();
155        assert_eq!(
156            result.input.body,
157            Body::Json(json!(["apple", "banana", "cherry"]))
158        );
159    }
160
161    #[tokio::test]
162    async fn empty_array_passthrough() {
163        let exchange = Exchange::new(Message::new(Body::Json(json!([]))));
164        let expr: SortExpression = Arc::new(|v| Ok(SortKey(v.clone())));
165        let svc = SortService::new(expr, false);
166        let result = svc.oneshot(exchange).await.unwrap();
167        assert_eq!(result.input.body, Body::Json(json!([])));
168    }
169
170    #[tokio::test]
171    async fn non_array_body_errors() {
172        let exchange = Exchange::new(Message::new(Body::Text("hello".to_string())));
173        let expr: SortExpression = Arc::new(|v| Ok(SortKey(v.clone())));
174        let svc = SortService::new(expr, false);
175        let result = svc.oneshot(exchange).await;
176        assert!(matches!(result, Err(CamelError::ProcessorError(_))));
177    }
178
179    #[tokio::test]
180    async fn array_key_errors() {
181        let exchange = Exchange::new(Message::new(Body::Json(json!([[1, 2], 3]))));
182        let expr: SortExpression = Arc::new(|v| {
183            if v.is_array() {
184                Err(CamelError::ProcessorError("array key rejected".into()))
185            } else {
186                Ok(SortKey(v.clone()))
187            }
188        });
189        let svc = SortService::new(expr, false);
190        let result = svc.oneshot(exchange).await;
191        assert!(result.is_err());
192    }
193
194    #[tokio::test]
195    async fn mixed_type_sort_key_order() {
196        // Null < Bool < Number < String
197        let exchange = Exchange::new(Message::new(Body::Json(json!([
198            "str", null, false, 42, true, 1
199        ]))));
200        let expr: SortExpression = Arc::new(|v| Ok(SortKey(v.clone())));
201        let svc = SortService::new(expr, false);
202        let result = svc.oneshot(exchange).await.unwrap();
203        // Expected: null, false, true, 1, 42, "str"
204        assert_eq!(
205            result.input.body,
206            Body::Json(json!([null, false, true, 1, 42, "str"]))
207        );
208    }
209}