agent_client_protocol/
util.rs1use futures::{
4 future::BoxFuture,
5 stream::{Stream, StreamExt},
6};
7
8mod typed;
9pub use typed::{MatchDispatch, MatchDispatchFrom, TypeNotification};
10
11pub fn json_cast<N, M>(params: N) -> Result<M, crate::Error>
13where
14 N: serde::Serialize,
15 M: serde::de::DeserializeOwned,
16{
17 let json = serde_json::to_value(params).map_err(|e| {
18 crate::Error::parse_error().data(serde_json::json!({
19 "error": e.to_string(),
20 "phase": "serialization"
21 }))
22 })?;
23 let m = serde_json::from_value(json.clone()).map_err(|e| {
24 crate::Error::parse_error().data(serde_json::json!({
25 "error": e.to_string(),
26 "json": json,
27 "phase": "deserialization"
28 }))
29 })?;
30 Ok(m)
31}
32
33pub fn json_cast_params<N, M>(params: N) -> Result<M, crate::Error>
40where
41 N: serde::Serialize,
42 M: serde::de::DeserializeOwned,
43{
44 let json = serde_json::to_value(params).map_err(|e| {
45 crate::Error::internal_error().data(serde_json::json!({
46 "error": e.to_string(),
47 "phase": "serialization"
48 }))
49 })?;
50 let m = serde_json::from_value(json.clone()).map_err(|e| {
51 crate::Error::invalid_params().data(serde_json::json!({
52 "error": e.to_string(),
53 "json": json,
54 "phase": "deserialization"
55 }))
56 })?;
57 Ok(m)
58}
59
60pub fn internal_error(message: impl ToString) -> crate::Error {
62 crate::Error::internal_error().data(message.to_string())
63}
64
65pub fn parse_error(message: impl ToString) -> crate::Error {
67 crate::Error::parse_error().data(message.to_string())
68}
69
70pub(crate) fn id_to_json(id: &jsonrpcmsg::Id) -> serde_json::Value {
72 match id {
73 jsonrpcmsg::Id::Number(n) => serde_json::Value::Number((*n).into()),
74 jsonrpcmsg::Id::String(s) => serde_json::Value::String(s.clone()),
75 jsonrpcmsg::Id::Null => serde_json::Value::Null,
76 }
77}
78
79pub(crate) fn instrumented_with_connection_name<F>(
80 name: String,
81 task: F,
82) -> tracing::instrument::Instrumented<F> {
83 use tracing::Instrument;
84
85 task.instrument(tracing::info_span!("connection", name = name))
86}
87
88pub(crate) async fn instrument_with_connection_name<R>(
89 name: Option<String>,
90 task: impl Future<Output = R>,
91) -> R {
92 if let Some(name) = name {
93 instrumented_with_connection_name(name.clone(), task).await
94 } else {
95 task.await
96 }
97}
98
99#[must_use]
101pub fn into_jsonrpc_error(err: crate::Error) -> crate::jsonrpcmsg::Error {
102 crate::jsonrpcmsg::Error {
103 code: err.code.into(),
104 message: err.message,
105 data: err.data,
106 }
107}
108
109pub async fn both<E>(
112 a: impl Future<Output = Result<(), E>>,
113 b: impl Future<Output = Result<(), E>>,
114) -> Result<(), E> {
115 let ((), ()) = futures::future::try_join(a, b).await?;
116 Ok(())
117}
118
119pub async fn run_until<T, E>(
125 background: impl Future<Output = Result<(), E>>,
126 foreground: impl Future<Output = Result<T, E>>,
127) -> Result<T, E> {
128 use futures::future::{Either, select};
129 use std::pin::pin;
130
131 match select(pin!(background), pin!(foreground)).await {
132 Either::Left((bg_result, fg_future)) => {
133 bg_result?; fg_future.await
136 }
137 Either::Right((fg_result, _bg_future)) => {
138 fg_result
140 }
141 }
142}
143
144pub async fn process_stream_concurrently<T, F>(
153 stream: impl Stream<Item = T>,
154 process_fn: F,
155 process_fn_hack: impl for<'a> Fn(&'a F, T) -> BoxFuture<'a, Result<(), crate::Error>>,
156) -> Result<(), crate::Error>
157where
158 F: AsyncFn(T) -> Result<(), crate::Error>,
159{
160 use std::pin::pin;
161
162 use futures::stream::{FusedStream, FuturesUnordered};
163 use futures_concurrency::future::Race;
164
165 enum Event<T> {
166 NewItem(Option<T>),
167 FutureCompleted(Option<Result<(), crate::Error>>),
168 }
169
170 let mut stream = pin!(stream.fuse());
171 let mut futures: FuturesUnordered<_> = FuturesUnordered::new();
172
173 loop {
174 if futures.is_empty() {
176 match stream.next().await {
177 Some(item) => futures.push(process_fn_hack(&process_fn, item)),
178 None => return Ok(()),
179 }
180 continue;
181 }
182
183 if stream.is_terminated() {
185 while let Some(result) = futures.next().await {
186 result?;
187 }
188 return Ok(());
189 }
190
191 let event = (async { Event::NewItem(stream.next().await) }, async {
193 Event::FutureCompleted(futures.next().await)
194 })
195 .race()
196 .await;
197
198 match event {
199 Event::NewItem(Some(item)) => {
200 futures.push(process_fn_hack(&process_fn, item));
201 }
202 Event::FutureCompleted(Some(result)) => {
203 result?;
204 }
205 Event::NewItem(None) | Event::FutureCompleted(None) => {
206 }
209 }
210 }
211}