Skip to main content

aiway_protocol/context/
response_context.rs

1use crate::SV;
2use bytes::Bytes;
3use dashmap::DashMap;
4use std::error::Error;
5use std::fmt::Debug;
6use std::pin::Pin;
7use tokio_stream::Stream;
8
9type StreamItem = Result<Vec<u8>, Box<dyn Error + Send + Sync>>;
10
11/// 响应上下文
12#[derive(Default)]
13pub struct ResponseContext {
14    /// 响应时间戳,毫秒
15    pub response_ts: SV<i64>,
16    /// 响应状态码
17    pub status: SV<Option<u16>>,
18    /// 响应头
19    pub headers: DashMap<String, String>,
20    /// 响应体
21    pub body: SV<Bytes>,
22    /// 响应流
23    pub stream_body: SV<Option<Pin<Box<dyn Stream<Item = StreamItem> + Send>>>>,
24}
25
26impl Debug for ResponseContext {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        f.debug_struct("ResponseContext")
29            .field("response_ts", &self.response_ts)
30            .field("status", &self.status)
31            .field("headers", &self.headers)
32            .field("body", &self.body)
33            //.field("stream_body", &"Stream<Item = Vec<u8>>")
34            .finish()
35    }
36}
37
38impl ResponseContext {
39    pub fn set_response_ts(&self, ts: i64) {
40        self.response_ts.set(ts);
41    }
42
43    pub fn get_response_ts(&self) -> i64 {
44        *self.response_ts.get().unwrap_or(&0)
45    }
46
47    pub fn set_status(&self, status: u16) {
48        self.status.set(Some(status));
49    }
50
51    pub fn get_status(&self) -> Option<u16> {
52        self.status.get().and_then(|opt| *opt)
53    }
54
55    pub fn insert_header(&self, key: &str, value: &str) {
56        self.headers.insert(key.to_string(), value.to_string());
57    }
58
59    pub fn get_header(&self, key: &str) -> Option<String> {
60        self.headers.get(key).map(|v| v.value().clone())
61    }
62
63    pub fn set_headers<H: IntoIterator<Item = (String, String)>>(&self, headers: H) {
64        for (key, value) in headers {
65            self.headers.insert(key, value);
66        }
67    }
68
69    pub fn remove_header(&self, key: &str) {
70        self.headers.remove(key);
71    }
72
73    pub fn clear_headers(&self) {
74        self.headers.clear();
75    }
76
77    pub fn set_body(&self, body: Bytes) {
78        self.body.set(body)
79    }
80
81    pub fn get_body(&self) -> Option<&Bytes> {
82        self.body.get()
83    }
84
85    pub fn clear_body(&self) {
86        self.body.set(Bytes::new());
87    }
88
89    pub fn set_stream_body(&self, body: Pin<Box<dyn Stream<Item = StreamItem> + Send>>) {
90        self.stream_body.set(Some(body));
91    }
92
93    pub fn take_stream_body(&self) -> Option<Pin<Box<dyn Stream<Item = StreamItem> + Send>>> {
94        self.stream_body.take().unwrap_or_default()
95    }
96
97    pub fn is_success(&self) -> bool {
98        self.status
99            .get()
100            .map(|s| s.unwrap_or(0) >= 200 && s.unwrap_or(0) < 300)
101            .unwrap_or(false)
102    }
103
104    pub fn is_client_error(&self) -> bool {
105        self.status
106            .get()
107            .map(|s| s.unwrap_or(0) >= 400 && s.unwrap_or(0) < 500)
108            .unwrap_or(false)
109    }
110
111    pub fn is_server_error(&self) -> bool {
112        self.status
113            .get()
114            .map(|s| s.unwrap_or(0) >= 500)
115            .unwrap_or(false)
116    }
117}