1use std::ffi::CStr;
21use std::fmt;
22use std::future::Future;
23use std::pin::Pin;
24
25use anyhow::bail;
26use anyhow::Context;
27use anyhow::Result;
28use bytes::Buf;
29use futures::future::FutureExt;
30
31use crate::serialize;
32use crate::ApplicationException;
33use crate::BufExt;
34use crate::ContextStack;
35use crate::Deserialize;
36use crate::MessageType;
37use crate::Protocol;
38use crate::ProtocolEncodedFinal;
39use crate::ProtocolReader;
40use crate::ProtocolWriter;
41use crate::RequestContext;
42use crate::ResultInfo;
43use crate::ResultType;
44use crate::Serialize;
45use crate::SerializedMessage;
46
47pub fn enum_display(
49 variants_by_number: &[(&str, i32)],
50 formatter: &mut fmt::Formatter,
51 number: i32,
52) -> fmt::Result {
53 match variants_by_number.binary_search_by_key(&number, |entry| entry.1) {
54 Ok(i) => formatter.write_str(variants_by_number[i].0),
55 Err(_) => write!(formatter, "{}", number),
56 }
57}
58
59pub fn enum_from_str(
61 variants_by_name: &[(&str, i32)],
62 value: &str,
63 type_name: &'static str,
64) -> Result<i32> {
65 match variants_by_name.binary_search_by_key(&value, |entry| entry.0) {
66 Ok(i) => Ok(variants_by_name[i].1),
67 Err(_) => bail!("Unable to parse {} as {}", value, type_name),
68 }
69}
70
71pub fn type_name_of_val<T>(_: &T) -> &'static str {
72 std::any::type_name::<T>()
73}
74
75pub fn buf_len<B: Buf>(b: &B) -> anyhow::Result<u32> {
76 let length: usize = b.remaining();
77 let length: u32 = length.try_into().with_context(|| {
78 format!("Unable to report a buffer length of {length} bytes as a `u32`")
79 })?;
80 Ok(length)
81}
82
83pub fn serialize_result_envelope<P, CTXT, RES>(
85 name: &str,
86 name_cstr: &<CTXT::ContextStack as ContextStack>::Name,
87 seqid: u32,
88 rctxt: &CTXT,
89 ctx_stack: &mut CTXT::ContextStack,
90 res: RES,
91) -> anyhow::Result<ProtocolEncodedFinal<P>>
92where
93 P: Protocol,
94 RES: ResultInfo + Serialize<P::Sizer> + Serialize<P::Serializer>,
95 CTXT: RequestContext<Name = CStr>,
96 <CTXT as RequestContext>::ContextStack: ContextStack<Frame = P::Frame>,
97 ProtocolEncodedFinal<P>: Clone + Buf + BufExt,
98{
99 let res_type = res.result_type();
100
101 if matches!(res_type, ResultType::Error | ResultType::Exception) {
102 assert_eq!(res.exn_is_declared(), res_type == ResultType::Error);
103
104 rctxt.set_user_exception_header(res.exn_name(), &res.exn_value())?;
105 }
106
107 ctx_stack.pre_write()?;
108 let envelope = serialize!(P, |p| {
109 p.write_message_begin(name, res_type.message_type(), seqid);
110 res.write(p);
111 p.write_message_end();
112 });
113
114 ctx_stack.on_write_data(SerializedMessage {
115 protocol: P::PROTOCOL_ID,
116 method_name: name_cstr,
117 buffer: envelope.clone(),
118 })?;
119 let bytes_written = buf_len(&envelope)?;
120 ctx_stack.post_write(bytes_written)?;
121
122 Ok(envelope)
123}
124
125pub fn serialize_stream_item<P, RES>(res: RES) -> anyhow::Result<ProtocolEncodedFinal<P>>
126where
127 P: Protocol,
128 RES: ResultInfo + Serialize<P::Sizer> + Serialize<P::Serializer>,
129{
130 Ok(serialize!(P, |p| {
131 res.write(p);
132 }))
133}
134
135pub fn serialize_request_envelope<P, ARGS>(
137 name: &str,
138 args: &ARGS,
139) -> anyhow::Result<ProtocolEncodedFinal<P>>
140where
141 P: Protocol,
142 ARGS: Serialize<P::Sizer> + Serialize<P::Serializer>,
143{
144 let envelope = serialize!(P, |p| {
145 p.write_message_begin(name, MessageType::Call, 0);
149 args.write(p);
150 p.write_message_end();
151 });
152
153 Ok(envelope)
154}
155
156pub fn deserialize_response_envelope<P, T>(
159 de: &mut P::Deserializer,
160) -> anyhow::Result<Result<T, ApplicationException>>
161where
162 P: Protocol,
163 T: Deserialize<P::Deserializer>,
164{
165 let (_, message_type, _) = de.read_message_begin(|_| ())?;
166
167 let res = match message_type {
168 MessageType::Reply => Ok(T::read(de)?),
169 MessageType::Exception => Err(ApplicationException::read(de)?),
170 MessageType::Call | MessageType::Oneway | MessageType::InvalidMessageType => {
171 bail!("Unwanted message type `{:?}`", message_type)
172 }
173 };
174
175 de.read_message_end()?;
176
177 Ok(res)
178}
179
180pub trait Spawner: 'static {
182 fn spawn<F, R>(func: F) -> Pin<Box<dyn Future<Output = R> + Send>>
183 where
184 F: FnOnce() -> R + Send + 'static,
185 R: Send + 'static;
186}
187
188pub struct NoopSpawner;
190impl Spawner for NoopSpawner {
191 #[inline]
192 fn spawn<F, R>(func: F) -> Pin<Box<dyn Future<Output = R> + Send>>
193 where
194 F: FnOnce() -> R + Send + 'static,
195 R: Send + 'static,
196 {
197 async { func() }.boxed()
198 }
199}
200
201pub async fn async_deserialize_response_envelope<P, T, S>(
202 de: P::Deserializer,
203) -> anyhow::Result<(Result<T, ApplicationException>, P::Deserializer)>
204where
205 P: Protocol,
206 P::Deserializer: Send,
207 T: Deserialize<P::Deserializer> + Send + 'static,
208 S: Spawner,
209{
210 S::spawn(move || {
211 let mut de = de;
212 let res = deserialize_response_envelope::<P, T>(&mut de);
213 res.map(|res| (res, de))
214 })
215 .await
216}