turul_a2a_aws_lambda/
no_streaming.rs1use std::task::{Context, Poll};
7
8use axum::body::Body;
9use http::{Request, Response};
10use tower::{Layer, Service};
11
12use turul_a2a::error::A2aError;
13
14#[derive(Clone)]
16pub struct NoStreamingLayer;
17
18impl<S> Layer<S> for NoStreamingLayer {
19 type Service = NoStreamingService<S>;
20
21 fn layer(&self, inner: S) -> Self::Service {
22 NoStreamingService { inner }
23 }
24}
25
26#[derive(Clone)]
27pub struct NoStreamingService<S> {
28 inner: S,
29}
30
31fn is_streaming_path(path: &str) -> bool {
32 path.ends_with(":stream") || path.ends_with(":subscribe")
33}
34
35impl<S> Service<Request<Body>> for NoStreamingService<S>
36where
37 S: Service<Request<Body>, Response = Response<Body>> + Clone + Send + 'static,
38 S::Future: Send,
39{
40 type Response = Response<Body>;
41 type Error = S::Error;
42 type Future = std::pin::Pin<
43 Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
44 >;
45
46 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
47 self.inner.poll_ready(cx)
48 }
49
50 fn call(&mut self, req: Request<Body>) -> Self::Future {
51 let path = req.uri().path().to_string();
52
53 if is_streaming_path(&path) {
54 let err = A2aError::UnsupportedOperation {
55 message:
56 "Streaming is not supported on Lambda. Use request/response endpoints instead."
57 .into(),
58 };
59 let body = err.to_http_error_body();
60 let status = axum::http::StatusCode::from_u16(err.http_status())
61 .unwrap_or(axum::http::StatusCode::BAD_REQUEST);
62
63 return Box::pin(async move {
64 Ok(Response::builder()
65 .status(status)
66 .header(http::header::CONTENT_TYPE, "application/json")
67 .body(Body::from(serde_json::to_string(&body).unwrap_or_default()))
68 .unwrap())
69 });
70 }
71
72 let mut inner = self.inner.clone();
73 Box::pin(async move { inner.call(req).await })
74 }
75}
76
77#[cfg(test)]
78mod tests {
79 use super::*;
80
81 #[test]
82 fn streaming_paths_detected() {
83 assert!(is_streaming_path("/message:stream"));
84 assert!(is_streaming_path("/tenant/message:stream"));
85 assert!(is_streaming_path("/tasks/abc:subscribe"));
86 assert!(is_streaming_path("/tenant/tasks/abc:subscribe"));
87
88 assert!(!is_streaming_path("/message:send"));
89 assert!(!is_streaming_path("/tasks/abc:cancel"));
90 assert!(!is_streaming_path("/tasks"));
91 assert!(!is_streaming_path("/.well-known/agent-card.json"));
92 }
93}