Skip to main content

mcpr_core/proxy/pipeline/middlewares/
session_delete.rs

1//! Request-side middleware: on `DELETE` + `mcp-session-id`, tear down
2//! the local session and emit `SessionEnd`, then let the transport
3//! forward the DELETE upstream. (No short-circuit — forwarding the
4//! DELETE lets the upstream release its own session state.)
5
6use async_trait::async_trait;
7use axum::http::Method;
8
9use crate::event::{ProxyEvent, SessionEndEvent};
10use crate::protocol::session::SessionStore;
11use crate::proxy::pipeline::middleware::{Flow, RequestMiddleware};
12use crate::proxy::pipeline::values::{Context, Request};
13
14pub struct SessionDeleteMiddleware;
15
16#[async_trait]
17impl RequestMiddleware for SessionDeleteMiddleware {
18    fn name(&self) -> &'static str {
19        "session_delete"
20    }
21
22    async fn on_request(&self, req: Request, cx: &mut Context) -> Flow {
23        let Request::Mcp(ref mcp) = req else {
24            return Flow::Continue(req);
25        };
26        if cx.intake.http_method != Method::DELETE {
27            return Flow::Continue(req);
28        }
29        let Some(sid) = mcp.session_hint.as_ref() else {
30            return Flow::Continue(req);
31        };
32
33        let state = cx.intake.proxy.clone();
34        state.sessions.remove(sid.as_str()).await;
35        state
36            .event_bus
37            .emit(ProxyEvent::SessionEnd(SessionEndEvent {
38                session_id: sid.as_str().to_string(),
39                ts: chrono::Utc::now().timestamp_millis(),
40            }));
41
42        Flow::Continue(req)
43    }
44}
45
46#[cfg(test)]
47#[allow(non_snake_case)]
48mod tests {
49    use super::*;
50
51    use axum::body::Body;
52    use axum::http::HeaderMap;
53    use serde_json::Value;
54
55    use crate::proxy::pipeline::middlewares::test_support::{
56        mcp_delete_request, mcp_request, test_context_with_method, test_proxy_with_sink,
57    };
58    use crate::proxy::pipeline::values::RawRequest;
59
60    #[tokio::test]
61    async fn on_request__delete_with_session_emits_and_removes() {
62        let (proxy, sink, handle) = test_proxy_with_sink();
63        proxy.sessions.create("sess-1").await;
64        let mut cx = test_context_with_method(proxy.clone(), Method::DELETE);
65        let req = mcp_delete_request(Some("sess-1"));
66
67        let flow = SessionDeleteMiddleware.on_request(req, &mut cx).await;
68        assert!(matches!(flow, Flow::Continue(Request::Mcp(_))));
69        assert!(proxy.sessions.get("sess-1").await.is_none());
70
71        handle.shutdown().await;
72        let saw_end = sink
73            .snapshot()
74            .iter()
75            .any(|e| matches!(e, ProxyEvent::SessionEnd(s) if s.session_id == "sess-1"));
76        assert!(saw_end);
77    }
78
79    #[tokio::test]
80    async fn on_request__delete_without_session_hint_is_noop() {
81        let (proxy, sink, handle) = test_proxy_with_sink();
82        let mut cx = test_context_with_method(proxy.clone(), Method::DELETE);
83        let req = mcp_delete_request(None);
84
85        SessionDeleteMiddleware.on_request(req, &mut cx).await;
86        handle.shutdown().await;
87        assert!(sink.snapshot().is_empty());
88    }
89
90    #[tokio::test]
91    async fn on_request__non_delete_method_is_noop() {
92        let (proxy, sink, handle) = test_proxy_with_sink();
93        proxy.sessions.create("sess-2").await;
94        let mut cx = test_context_with_method(proxy.clone(), Method::POST);
95        let req = mcp_request("tools/list", Value::Null, Some("sess-2"));
96
97        SessionDeleteMiddleware.on_request(req, &mut cx).await;
98        assert!(proxy.sessions.get("sess-2").await.is_some());
99        handle.shutdown().await;
100        assert!(sink.snapshot().is_empty());
101    }
102
103    #[tokio::test]
104    async fn on_request__non_mcp_is_noop() {
105        let (proxy, sink, handle) = test_proxy_with_sink();
106        let mut cx = test_context_with_method(proxy, Method::DELETE);
107        let req = Request::Raw(RawRequest {
108            method: Method::DELETE,
109            path: "/something".into(),
110            body: Body::empty(),
111            headers: HeaderMap::new(),
112        });
113
114        SessionDeleteMiddleware.on_request(req, &mut cx).await;
115        handle.shutdown().await;
116        assert!(sink.snapshot().is_empty());
117    }
118
119    #[tokio::test]
120    async fn on_request__unknown_session_still_emits() {
121        let (proxy, sink, handle) = test_proxy_with_sink();
122        let mut cx = test_context_with_method(proxy, Method::DELETE);
123        let req = mcp_delete_request(Some("never-existed"));
124
125        SessionDeleteMiddleware.on_request(req, &mut cx).await;
126        handle.shutdown().await;
127        let saw_end = sink
128            .snapshot()
129            .iter()
130            .any(|e| matches!(e, ProxyEvent::SessionEnd(s) if s.session_id == "never-existed"));
131        assert!(saw_end);
132    }
133}