futuresdr_remote/
remote.rs

1use futuresdr_types::BlockDescription;
2use futuresdr_types::BlockId;
3use futuresdr_types::FlowgraphDescription;
4use futuresdr_types::Pmt;
5use futuresdr_types::PortId;
6use reqwest::Client;
7use reqwest::IntoUrl;
8use serde::Deserialize;
9
10use crate::Error;
11
12async fn get<T: for<'a> Deserialize<'a>>(client: Client, url: impl IntoUrl) -> Result<T, Error> {
13    Ok(client.get(url).send().await?.json::<T>().await?)
14}
15
16/// Connection to a remote runtime.
17pub struct Remote {
18    client: Client,
19    url: String,
20}
21
22impl Remote {
23    /// Create a [`Remote`].
24    pub fn new<I: Into<String>>(url: I) -> Self {
25        Self {
26            client: Client::new(),
27            url: url.into(),
28        }
29    }
30
31    /// Get a specific [`Flowgraph`].
32    pub async fn flowgraph(&self, id: usize) -> Result<Flowgraph, Error> {
33        let fgs = self.flowgraphs().await?;
34        fgs.iter()
35            .find(|x| x.id == id)
36            .cloned()
37            .ok_or(Error::FlowgraphId(id))
38    }
39
40    /// Get a list of all running [`Flowgraphs`](Flowgraph).
41    pub async fn flowgraphs(&self) -> Result<Vec<Flowgraph>, Error> {
42        let ids: Vec<usize> = get(self.client.clone(), format!("{}/api/fg/", self.url)).await?;
43        let mut v = Vec::new();
44
45        for i in ids.into_iter() {
46            let fg: FlowgraphDescription =
47                get(self.client.clone(), format!("{}/api/fg/{}/", self.url, i)).await?;
48            v.push(fg);
49        }
50
51        let v = v
52            .into_iter()
53            .enumerate()
54            .map(|(i, f)| Flowgraph {
55                id: i,
56                description: f,
57                client: self.client.clone(),
58                url: self.url.clone(),
59            })
60            .collect();
61
62        Ok(v)
63    }
64}
65
66/// A remote Flowgraph.
67#[derive(Clone, Debug)]
68pub struct Flowgraph {
69    id: usize,
70    description: FlowgraphDescription,
71    client: Client,
72    url: String,
73}
74
75impl Flowgraph {
76    /// Update the [`Flowgraph`], getting current blocks and connections.
77    pub async fn update(&mut self) -> Result<(), Error> {
78        self.description = get(
79            self.client.clone(),
80            format!("{}/api/fg/{}/", self.url, self.id),
81        )
82        .await?;
83        Ok(())
84    }
85
86    /// Get a list of the [`Blocks`](Block) of the [`Flowgraph`].
87    pub fn blocks(&self) -> Vec<Block> {
88        self.description
89            .blocks
90            .iter()
91            .map(|d| Block {
92                description: d.clone(),
93                client: self.client.clone(),
94                url: self.url.clone(),
95                flowgraph_id: self.id,
96            })
97            .collect()
98    }
99
100    /// Get a specific [`Block`](Block) of the [`Flowgraph`] by `id`.
101    ///
102    /// Returns `None` if `Block` is not found.
103    pub fn block(&self, id: BlockId) -> Option<Block> {
104        self.block_by(|d| d.id == id)
105    }
106
107    /// Get a specific [`Block`](Block) of the [`Flowgraph`] by `instance_name`.
108    ///
109    /// Returns `None` if `Block` is not found.
110    pub fn block_by_name(&self, name: &str) -> Option<Block> {
111        self.block_by(|d| d.instance_name == name)
112    }
113
114    /// Find the first [`Block`](Block) of the [`Flowgraph`] matching the given predicate
115    /// on [`BlockDescription`].
116    ///
117    /// Returns `None` if no `BlockDescription` matches given predicate.
118    pub fn block_by(&self, pred: impl Fn(&BlockDescription) -> bool) -> Option<Block> {
119        self.description
120            .blocks
121            .iter()
122            .find(|d| pred(d))
123            .map(|d| Block {
124                description: d.clone(),
125                client: self.client.clone(),
126                url: self.url.clone(),
127                flowgraph_id: self.id,
128            })
129    }
130
131    /// Get a list of all message [`Connections`](Connection) of the [`Flowgraph`].
132    pub fn message_connections(&self) -> Vec<Connection> {
133        self.description
134            .message_edges
135            .iter()
136            .map(|d| Connection {
137                connection_type: ConnectionType::Message,
138                src_block: self.block(d.0).unwrap(),
139                src_port: d.1.clone(),
140                dst_block: self.block(d.2).unwrap(),
141                dst_port: d.3.clone(),
142            })
143            .collect()
144    }
145
146    /// Get a list of all stream [`Connections`](Connection) of the [`Flowgraph`].
147    pub fn stream_connections(&self) -> Vec<Connection> {
148        self.description
149            .stream_edges
150            .iter()
151            .map(|d| Connection {
152                connection_type: ConnectionType::Stream,
153                src_block: self.block(d.0).unwrap(),
154                src_port: d.1.clone(),
155                dst_block: self.block(d.2).unwrap(),
156                dst_port: d.3.clone(),
157            })
158            .collect()
159    }
160}
161
162impl std::fmt::Display for Flowgraph {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        write!(
165            f,
166            "Flowgraph {} (B {} / S {} / M {})",
167            self.id,
168            self.description.blocks.len(),
169            self.description.stream_edges.len(),
170            self.description.message_edges.len()
171        )
172    }
173}
174
175/// Specify a message handler of a [`Block`]
176#[derive(Clone, Debug)]
177pub enum Handler {
178    /// Nueric ID of the handler
179    Id(usize),
180    /// Name of the handler
181    Name(String),
182}
183
184/// A [`Block`] of a [`Flowgraph`].
185#[derive(Clone, Debug)]
186pub struct Block {
187    description: BlockDescription,
188    client: Client,
189    url: String,
190    flowgraph_id: usize,
191}
192
193impl Block {
194    /// Update the [`Block`], retrieving a new [`BlockDescription`] from the [`Flowgraph`].
195    pub async fn update(&mut self) -> Result<(), Error> {
196        self.description = get(
197            self.client.clone(),
198            format!(
199                "{}/api/fg/{}/block/{}/",
200                self.url, self.flowgraph_id, self.description.id.0
201            ),
202        )
203        .await?;
204        Ok(())
205    }
206
207    /// Call a message handler of a [`Block`], providing it a [`Pmt::Null`](futuresdr_types::Pmt).
208    ///
209    /// This is usually used, when the caller is only interested in the return value. The handler
210    /// might, for example, just return a parameter (think `get_frequency`, `get_gain`, etc).
211    pub async fn call(&self, handler: Handler) -> Result<Pmt, Error> {
212        self.callback(handler, Pmt::Null).await
213    }
214
215    /// Call a message handler of a [`Block`] with the given [`Pmt`](futuresdr_types::Pmt).
216    pub async fn callback(&self, handler: Handler, pmt: Pmt) -> Result<Pmt, Error> {
217        let url = match handler {
218            Handler::Name(n) => format!(
219                "{}/api/fg/{}/block/{}/call/{}/",
220                &self.url, self.flowgraph_id, self.description.id.0, n
221            ),
222            Handler::Id(i) => format!(
223                "{}/api/fg/{}/block/{}/call/{}/",
224                &self.url, self.flowgraph_id, self.description.id.0, i
225            ),
226        };
227
228        Ok(self
229            .client
230            .post(url)
231            .json(&pmt)
232            .send()
233            .await?
234            .json::<Pmt>()
235            .await?)
236    }
237
238    /// BlockDescription
239    pub fn description(&self) -> &BlockDescription {
240        &self.description
241    }
242}
243
244impl std::fmt::Display for Block {
245    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246        write!(
247            f,
248            "{} ({}, {})",
249            &self.description.instance_name, &self.description.type_name, self.description.id.0,
250        )
251    }
252}
253
254/// Connection type for a [`Connection`] between [`Blocks`](Block)
255#[derive(Debug, Clone)]
256pub enum ConnectionType {
257    /// Stream Connection
258    Stream,
259    /// Message Connection
260    Message,
261}
262
263/// A Connection between [`Blocks`](Block)
264#[derive(Clone, Debug)]
265pub struct Connection {
266    connection_type: ConnectionType,
267    src_block: Block,
268    src_port: PortId,
269    dst_block: Block,
270    dst_port: PortId,
271}
272
273impl Connection {
274    /// Connection type
275    pub fn connection_type(&self) -> ConnectionType {
276        self.connection_type.clone()
277    }
278    /// Source block
279    pub fn src_block(&self) -> &Block {
280        &self.src_block
281    }
282    /// Source port
283    pub fn src_port(&self) -> &PortId {
284        &self.src_port
285    }
286    /// Source block
287    pub fn dst_block(&self) -> &Block {
288        &self.dst_block
289    }
290    /// Source port
291    pub fn dst_port(&self) -> &PortId {
292        &self.dst_port
293    }
294}
295
296impl std::fmt::Display for Connection {
297    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298        match self.connection_type {
299            ConnectionType::Stream => write!(
300                f,
301                "{}.{} > {}.{}",
302                self.src_block.description.instance_name,
303                self.src_port.name(),
304                self.dst_block.description.instance_name,
305                self.dst_port.name()
306            ),
307            ConnectionType::Message => write!(
308                f,
309                "{}.{} | {}.{}",
310                self.src_block.description.instance_name,
311                self.src_port.name(),
312                self.dst_block.description.instance_name,
313                self.dst_port.name()
314            ),
315        }
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use crate::Flowgraph;
322    use futuresdr_types::BlockDescription;
323    use futuresdr_types::BlockId;
324    use futuresdr_types::FlowgraphDescription;
325    use futuresdr_types::PortId;
326
327    fn block(id: usize, name: &str) -> BlockDescription {
328        BlockDescription {
329            id: BlockId(id),
330            type_name: "test_block".to_string(),
331            instance_name: name.to_string(),
332            stream_inputs: vec!["in".to_string()],
333            stream_outputs: vec!["out".to_string()],
334            message_inputs: vec!["command".to_string()],
335            message_outputs: vec!["message".to_string()],
336            blocking: false,
337        }
338    }
339
340    #[test]
341    fn find_block() {
342        let fg = Flowgraph {
343            id: 0,
344            description: FlowgraphDescription {
345                blocks: vec![block(0, "a"), block(1, "b")],
346                stream_edges: vec![(
347                    BlockId(0),
348                    PortId::new("output"),
349                    BlockId(1),
350                    PortId::new("input"),
351                )],
352                message_edges: vec![(
353                    BlockId(1),
354                    PortId::new("out"),
355                    BlockId(0),
356                    PortId::new("in"),
357                )],
358            },
359            client: reqwest::Client::new(),
360            url: "http://localhost".to_string(),
361        };
362
363        assert_eq!(
364            fg.block(BlockId(0)).map(|b| b.description.instance_name),
365            Some("a".to_string())
366        );
367        assert_eq!(
368            fg.block_by_name("b").map(|b| b.description.id),
369            Some(BlockId(1))
370        );
371        assert!(fg.block_by(|d| d.type_name == "test_block").is_some());
372        assert!(fg.block_by(|d| d.type_name == "foo").is_none());
373    }
374}