1use futuresdr_types::BlockDescription;
2use futuresdr_types::BlockId;
3use futuresdr_types::FlowgraphDescription;
4use futuresdr_types::Pmt;
5use futuresdr_types::PortId;
6use reqwest::Client;
7use reqwest::IntoUrl;
8use serde::Deserialize;
9
10use crate::Error;
11
12async fn get<T: for<'a> Deserialize<'a>>(client: Client, url: impl IntoUrl) -> Result<T, Error> {
13 Ok(client.get(url).send().await?.json::<T>().await?)
14}
15
16pub struct Remote {
18 client: Client,
19 url: String,
20}
21
22impl Remote {
23 pub fn new<I: Into<String>>(url: I) -> Self {
25 Self {
26 client: Client::new(),
27 url: url.into(),
28 }
29 }
30
31 pub async fn flowgraph(&self, id: usize) -> Result<Flowgraph, Error> {
33 let fgs = self.flowgraphs().await?;
34 fgs.iter()
35 .find(|x| x.id == id)
36 .cloned()
37 .ok_or(Error::FlowgraphId(id))
38 }
39
40 pub async fn flowgraphs(&self) -> Result<Vec<Flowgraph>, Error> {
42 let ids: Vec<usize> = get(self.client.clone(), format!("{}/api/fg/", self.url)).await?;
43 let mut v = Vec::new();
44
45 for i in ids.into_iter() {
46 let fg: FlowgraphDescription =
47 get(self.client.clone(), format!("{}/api/fg/{}/", self.url, i)).await?;
48 v.push(fg);
49 }
50
51 let v = v
52 .into_iter()
53 .enumerate()
54 .map(|(i, f)| Flowgraph {
55 id: i,
56 description: f,
57 client: self.client.clone(),
58 url: self.url.clone(),
59 })
60 .collect();
61
62 Ok(v)
63 }
64}
65
66#[derive(Clone, Debug)]
68pub struct Flowgraph {
69 id: usize,
70 description: FlowgraphDescription,
71 client: Client,
72 url: String,
73}
74
75impl Flowgraph {
76 pub async fn update(&mut self) -> Result<(), Error> {
78 self.description = get(
79 self.client.clone(),
80 format!("{}/api/fg/{}/", self.url, self.id),
81 )
82 .await?;
83 Ok(())
84 }
85
86 pub fn blocks(&self) -> Vec<Block> {
88 self.description
89 .blocks
90 .iter()
91 .map(|d| Block {
92 description: d.clone(),
93 client: self.client.clone(),
94 url: self.url.clone(),
95 flowgraph_id: self.id,
96 })
97 .collect()
98 }
99
100 pub fn block(&self, id: BlockId) -> Option<Block> {
104 self.block_by(|d| d.id == id)
105 }
106
107 pub fn block_by_name(&self, name: &str) -> Option<Block> {
111 self.block_by(|d| d.instance_name == name)
112 }
113
114 pub fn block_by(&self, pred: impl Fn(&BlockDescription) -> bool) -> Option<Block> {
119 self.description
120 .blocks
121 .iter()
122 .find(|d| pred(d))
123 .map(|d| Block {
124 description: d.clone(),
125 client: self.client.clone(),
126 url: self.url.clone(),
127 flowgraph_id: self.id,
128 })
129 }
130
131 pub fn message_connections(&self) -> Vec<Connection> {
133 self.description
134 .message_edges
135 .iter()
136 .map(|d| Connection {
137 connection_type: ConnectionType::Message,
138 src_block: self.block(d.0).unwrap(),
139 src_port: d.1.clone(),
140 dst_block: self.block(d.2).unwrap(),
141 dst_port: d.3.clone(),
142 })
143 .collect()
144 }
145
146 pub fn stream_connections(&self) -> Vec<Connection> {
148 self.description
149 .stream_edges
150 .iter()
151 .map(|d| Connection {
152 connection_type: ConnectionType::Stream,
153 src_block: self.block(d.0).unwrap(),
154 src_port: d.1.clone(),
155 dst_block: self.block(d.2).unwrap(),
156 dst_port: d.3.clone(),
157 })
158 .collect()
159 }
160}
161
162impl std::fmt::Display for Flowgraph {
163 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 write!(
165 f,
166 "Flowgraph {} (B {} / S {} / M {})",
167 self.id,
168 self.description.blocks.len(),
169 self.description.stream_edges.len(),
170 self.description.message_edges.len()
171 )
172 }
173}
174
175#[derive(Clone, Debug)]
177pub enum Handler {
178 Id(usize),
180 Name(String),
182}
183
184#[derive(Clone, Debug)]
186pub struct Block {
187 description: BlockDescription,
188 client: Client,
189 url: String,
190 flowgraph_id: usize,
191}
192
193impl Block {
194 pub async fn update(&mut self) -> Result<(), Error> {
196 self.description = get(
197 self.client.clone(),
198 format!(
199 "{}/api/fg/{}/block/{}/",
200 self.url, self.flowgraph_id, self.description.id.0
201 ),
202 )
203 .await?;
204 Ok(())
205 }
206
207 pub async fn call(&self, handler: Handler) -> Result<Pmt, Error> {
212 self.callback(handler, Pmt::Null).await
213 }
214
215 pub async fn callback(&self, handler: Handler, pmt: Pmt) -> Result<Pmt, Error> {
217 let url = match handler {
218 Handler::Name(n) => format!(
219 "{}/api/fg/{}/block/{}/call/{}/",
220 &self.url, self.flowgraph_id, self.description.id.0, n
221 ),
222 Handler::Id(i) => format!(
223 "{}/api/fg/{}/block/{}/call/{}/",
224 &self.url, self.flowgraph_id, self.description.id.0, i
225 ),
226 };
227
228 Ok(self
229 .client
230 .post(url)
231 .json(&pmt)
232 .send()
233 .await?
234 .json::<Pmt>()
235 .await?)
236 }
237
238 pub fn description(&self) -> &BlockDescription {
240 &self.description
241 }
242}
243
244impl std::fmt::Display for Block {
245 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 write!(
247 f,
248 "{} ({}, {})",
249 &self.description.instance_name, &self.description.type_name, self.description.id.0,
250 )
251 }
252}
253
254#[derive(Debug, Clone)]
256pub enum ConnectionType {
257 Stream,
259 Message,
261}
262
263#[derive(Clone, Debug)]
265pub struct Connection {
266 connection_type: ConnectionType,
267 src_block: Block,
268 src_port: PortId,
269 dst_block: Block,
270 dst_port: PortId,
271}
272
273impl Connection {
274 pub fn connection_type(&self) -> ConnectionType {
276 self.connection_type.clone()
277 }
278 pub fn src_block(&self) -> &Block {
280 &self.src_block
281 }
282 pub fn src_port(&self) -> &PortId {
284 &self.src_port
285 }
286 pub fn dst_block(&self) -> &Block {
288 &self.dst_block
289 }
290 pub fn dst_port(&self) -> &PortId {
292 &self.dst_port
293 }
294}
295
296impl std::fmt::Display for Connection {
297 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298 match self.connection_type {
299 ConnectionType::Stream => write!(
300 f,
301 "{}.{} > {}.{}",
302 self.src_block.description.instance_name,
303 self.src_port.name(),
304 self.dst_block.description.instance_name,
305 self.dst_port.name()
306 ),
307 ConnectionType::Message => write!(
308 f,
309 "{}.{} | {}.{}",
310 self.src_block.description.instance_name,
311 self.src_port.name(),
312 self.dst_block.description.instance_name,
313 self.dst_port.name()
314 ),
315 }
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use crate::Flowgraph;
322 use futuresdr_types::BlockDescription;
323 use futuresdr_types::BlockId;
324 use futuresdr_types::FlowgraphDescription;
325 use futuresdr_types::PortId;
326
327 fn block(id: usize, name: &str) -> BlockDescription {
328 BlockDescription {
329 id: BlockId(id),
330 type_name: "test_block".to_string(),
331 instance_name: name.to_string(),
332 stream_inputs: vec!["in".to_string()],
333 stream_outputs: vec!["out".to_string()],
334 message_inputs: vec!["command".to_string()],
335 message_outputs: vec!["message".to_string()],
336 blocking: false,
337 }
338 }
339
340 #[test]
341 fn find_block() {
342 let fg = Flowgraph {
343 id: 0,
344 description: FlowgraphDescription {
345 blocks: vec![block(0, "a"), block(1, "b")],
346 stream_edges: vec![(
347 BlockId(0),
348 PortId::new("output"),
349 BlockId(1),
350 PortId::new("input"),
351 )],
352 message_edges: vec![(
353 BlockId(1),
354 PortId::new("out"),
355 BlockId(0),
356 PortId::new("in"),
357 )],
358 },
359 client: reqwest::Client::new(),
360 url: "http://localhost".to_string(),
361 };
362
363 assert_eq!(
364 fg.block(BlockId(0)).map(|b| b.description.instance_name),
365 Some("a".to_string())
366 );
367 assert_eq!(
368 fg.block_by_name("b").map(|b| b.description.id),
369 Some(BlockId(1))
370 );
371 assert!(fg.block_by(|d| d.type_name == "test_block").is_some());
372 assert!(fg.block_by(|d| d.type_name == "foo").is_none());
373 }
374}