Skip to main content

turul_a2a_aws_lambda/
no_streaming.rs

1//! NoStreamingLayer — rejects streaming paths with UnsupportedOperationError.
2//!
3//! Per ADR-008: streaming on Lambda uses the existing A2A error contract,
4//! not a Lambda-specific 501.
5
6use std::task::{Context, Poll};
7
8use axum::body::Body;
9use http::{Request, Response};
10use tower::{Layer, Service};
11
12use turul_a2a::error::A2aError;
13
14/// Tower Layer that rejects streaming paths on Lambda.
15#[derive(Clone)]
16pub struct NoStreamingLayer;
17
18impl<S> Layer<S> for NoStreamingLayer {
19    type Service = NoStreamingService<S>;
20
21    fn layer(&self, inner: S) -> Self::Service {
22        NoStreamingService { inner }
23    }
24}
25
26#[derive(Clone)]
27pub struct NoStreamingService<S> {
28    inner: S,
29}
30
31fn is_streaming_path(path: &str) -> bool {
32    path.ends_with(":stream") || path.ends_with(":subscribe")
33}
34
35impl<S> Service<Request<Body>> for NoStreamingService<S>
36where
37    S: Service<Request<Body>, Response = Response<Body>> + Clone + Send + 'static,
38    S::Future: Send,
39{
40    type Response = Response<Body>;
41    type Error = S::Error;
42    type Future = std::pin::Pin<
43        Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
44    >;
45
46    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
47        self.inner.poll_ready(cx)
48    }
49
50    fn call(&mut self, req: Request<Body>) -> Self::Future {
51        let path = req.uri().path().to_string();
52
53        if is_streaming_path(&path) {
54            let err = A2aError::UnsupportedOperation {
55                message:
56                    "Streaming is not supported on Lambda. Use request/response endpoints instead."
57                        .into(),
58            };
59            let body = err.to_http_error_body();
60            let status = axum::http::StatusCode::from_u16(err.http_status())
61                .unwrap_or(axum::http::StatusCode::BAD_REQUEST);
62
63            return Box::pin(async move {
64                Ok(Response::builder()
65                    .status(status)
66                    .header(http::header::CONTENT_TYPE, "application/json")
67                    .body(Body::from(serde_json::to_string(&body).unwrap_or_default()))
68                    .unwrap())
69            });
70        }
71
72        let mut inner = self.inner.clone();
73        Box::pin(async move { inner.call(req).await })
74    }
75}
76
77#[cfg(test)]
78mod tests {
79    use super::*;
80
81    #[test]
82    fn streaming_paths_detected() {
83        assert!(is_streaming_path("/message:stream"));
84        assert!(is_streaming_path("/tenant/message:stream"));
85        assert!(is_streaming_path("/tasks/abc:subscribe"));
86        assert!(is_streaming_path("/tenant/tasks/abc:subscribe"));
87
88        assert!(!is_streaming_path("/message:send"));
89        assert!(!is_streaming_path("/tasks/abc:cancel"));
90        assert!(!is_streaming_path("/tasks"));
91        assert!(!is_streaming_path("/.well-known/agent-card.json"));
92    }
93}