gst_client_rs/
client.rs

1//! Defines [`GstClient`] for communication with
2//! [`GStreamer Daemon`][1] API.
3//!
4//! [1]: https://developer.ridgerun.com/wiki/index.php/GStreamer_Daemon
5use crate::{gstd_types, resources, Error};
6use reqwest::{Client, Response};
7use url::Url;
8
9/// [`GstClient`] for [`GStreamer Daemon`][1] API.
10///
11/// [1]: https://developer.ridgerun.com/wiki/index.php/GStreamer_Daemon
12#[derive(Debug, Clone)]
13pub struct GstClient {
14    http_client: Client,
15    pub(crate) base_url: Url,
16}
17
18impl GstClient {
19    /// Build [`GstClient`] for future call to [`GStreamer Daemon`][1] API.
20    ///
21    /// # Errors
22    ///
23    /// If incorrect `base_url` passed
24    ///
25    /// [1]: https://developer.ridgerun.com/wiki/index.php/GStreamer_Daemon
26    pub fn build<S: Into<String>>(base_url: S) -> Result<Self, Error> {
27        Ok(Self {
28            http_client: Client::new(),
29            base_url: Url::parse(&base_url.into()).map_err(Error::IncorrectBaseUrl)?,
30        })
31    }
32
33    pub(crate) async fn get(&self, url: reqwest::Url) -> Result<Response, Error> {
34        self.http_client
35            .get(url)
36            .send()
37            .await
38            .map_err(Error::RequestFailed)
39    }
40
41    pub(crate) async fn post(&self, url: reqwest::Url) -> Result<Response, Error> {
42        self.http_client
43            .post(url)
44            .send()
45            .await
46            .map_err(Error::RequestFailed)
47    }
48
49    pub(crate) async fn put(&self, url: reqwest::Url) -> Result<Response, Error> {
50        self.http_client
51            .put(url)
52            .send()
53            .await
54            .map_err(Error::RequestFailed)
55    }
56
57    pub(crate) async fn delete(&self, url: reqwest::Url) -> Result<Response, Error> {
58        self.http_client
59            .delete(url)
60            .send()
61            .await
62            .map_err(Error::RequestFailed)
63    }
64
65    pub(crate) async fn process_resp(&self, resp: Response) -> Result<gstd_types::Response, Error> {
66        if !resp.status().is_success() {
67            let status = resp.status();
68            let error = &resp.text().await.map_err(Error::BadBody)?;
69            return Err(Error::BadStatus(status, Some(error.to_string())));
70        }
71
72        let res = resp
73            .json::<gstd_types::Response>()
74            .await
75            .map_err(Error::BadBody)?;
76
77        if res.code != gstd_types::ResponseCode::Success {
78            return Err(Error::GstdError(res.code));
79        }
80        Ok(res)
81    }
82
83    /// Performs `GET /pipelines` API request, returning the
84    /// parsed [`gstd_types::Response`]
85    ///
86    /// # Errors
87    ///
88    /// If API request cannot be performed, or fails.
89    /// See [`Error`] for details.
90    pub async fn pipelines(&self) -> Result<gstd_types::Response, Error> {
91        let url = self
92            .base_url
93            .join("pipelines")
94            .map_err(Error::IncorrectApiUrl)?;
95        let resp = self.get(url).await?;
96        self.process_resp(resp).await
97    }
98    /// Operate with [`GStreamer Daemon`][1] pipelines.
99    ///
100    /// # Arguments
101    ///
102    /// * `name` - name of the pipeline
103    ///
104    /// [1]: https://developer.ridgerun.com/wiki/index.php/GStreamer_Daemon
105    #[must_use]
106    pub fn pipeline<S>(&self, name: S) -> resources::Pipeline
107    where
108        S: Into<String>,
109    {
110        resources::Pipeline::new(name, self)
111    }
112    /// Manage [`GStreamer Daemon`][1] Debug mode.
113    ///
114    /// [1]: https://developer.ridgerun.com/wiki/index.php/GStreamer_Daemon
115    #[must_use]
116    pub fn debug(&self) -> resources::Debug {
117        resources::Debug::new(self)
118    }
119}
120
121impl Default for GstClient {
122    fn default() -> Self {
123        Self {
124            http_client: Client::new(),
125            base_url: Url::parse("http://127.0.0.1:5001").unwrap(),
126        }
127    }
128}
129
130impl From<Url> for GstClient {
131    fn from(url: Url) -> Self {
132        Self {
133            http_client: Client::new(),
134            base_url: url,
135        }
136    }
137}
138
139impl From<&Url> for GstClient {
140    fn from(url: &Url) -> Self {
141        Self {
142            http_client: Client::new(),
143            base_url: url.clone(),
144        }
145    }
146}
147
148#[cfg(test)]
149mod spec {
150    use super::*;
151    use http;
152    const BASE_URL: &'static str = "http://localhost:5002";
153    const PIPELINE_NAME: &'static str = "test pipeline";
154
155    const STATE_RESPONSE: &'static str = r#"
156    {
157        "code" : 0,
158        "description" : "Success",
159        "response" : {
160          "name" : "state",
161          "value" : "playing",
162          "param" : {
163              "description" : "The state of the pipeline",
164              "type" : "GstdStateEnum",
165              "access" : "((GstdParamFlags) READ | 2)"
166          }
167        }
168      }
169    "#;
170
171    const SPLITMUXSINK_FRAGMENT_OPENED: &'static str = r#"
172    {
173        "code" : 0,
174        "description" : "Success",
175        "response" : {
176          "type" : "element",
177          "source" : "h264_splitmuxsink",
178          "timestamp" : "99:99:99.999999999",
179          "seqnum" : 7535,
180          "splitmuxsink-fragment-opened" : {
181              "location" : "/home/printnanny/.local/share/printnanny/video/88e1cc18-962f-4ec4-baeb-656b752520e5/00004.mp4",
182              "running-time" : 1020189067077,
183              "sink" : "(GstFileSink) sink"
184          }
185        }
186      }
187    "#;
188
189    const SPLITMUXSINK_FRAGMENT_CLOSED: &'static str = r#"
190    {
191        "code" : 0,
192        "description" : "Success",
193        "response" : {
194          "type" : "element",
195          "source" : "h264_splitmuxsink",
196          "timestamp" : "99:99:99.999999999",
197          "seqnum" : 7535,
198          "splitmuxsink-fragment-closed" : {
199              "location" : "/home/printnanny/.local/share/printnanny/video/88e1cc18-962f-4ec4-baeb-656b752520e5/00004.mp4",
200              "running-time" : 1020189067077,
201              "sink" : "(GstFileSink) sink"
202          }
203        }
204      }
205    "#;
206
207    fn expect_url() -> Url {
208        Url::parse(BASE_URL).unwrap()
209    }
210
211    #[tokio::test]
212    async fn process_state_response() {
213        let client = GstClient::build(BASE_URL).unwrap();
214        let response = http::Response::builder()
215            .status(200)
216            .body(STATE_RESPONSE)
217            .unwrap();
218        let res = client.process_resp(response.into()).await.unwrap();
219
220        let expected = gstd_types::ResponseT::Property(gstd_types::Property {
221            name: "state".into(),
222            value: gstd_types::PropertyValue::String("playing".into()),
223            param: gstd_types::Param {
224                description: "The state of the pipeline".into(),
225                _type: "GstdStateEnum".into(),
226                access: "((GstdParamFlags) READ | 2)".into(),
227            },
228        });
229
230        assert_eq!(res.response, expected);
231    }
232
233    #[tokio::test]
234    async fn process_splitmuxsink_fragment_opened() {
235        let client = GstClient::build(BASE_URL).unwrap();
236        let response = http::Response::builder()
237            .status(200)
238            .body(SPLITMUXSINK_FRAGMENT_OPENED)
239            .unwrap();
240
241        let res = client.process_resp(response.into()).await.unwrap();
242
243        let expected = gstd_types::ResponseT::GstSplitMuxSinkFragmentOpened(
244            gstd_types::GstSplitMuxSinkFragmentOpened {
245                r#type: "element".to_string(),
246                source: "h264_splitmuxsink".to_string(),
247                timestamp: "99:99:99.999999999".to_string(),
248                seqnum: 7535,
249                message: gstd_types::GstSplitMuxSinkFragmentMessage {
250                    location:  "/home/printnanny/.local/share/printnanny/video/88e1cc18-962f-4ec4-baeb-656b752520e5/00004.mp4".to_string(),
251                    running_time: 1020189067077,
252                    sink: "(GstFileSink) sink".to_string()
253                }
254            },
255        );
256
257        assert_eq!(res.response, expected);
258    }
259
260    #[tokio::test]
261    async fn process_splitmuxsink_fragment_closed() {
262        let client = GstClient::build(BASE_URL).unwrap();
263        let response = http::Response::builder()
264            .status(200)
265            .body(SPLITMUXSINK_FRAGMENT_CLOSED)
266            .unwrap();
267
268        let res = client.process_resp(response.into()).await.unwrap();
269
270        let expected = gstd_types::ResponseT::GstSplitMuxSinkFragmentClosed(
271            gstd_types::GstSplitMuxSinkFragmentClosed {
272                r#type: "element".to_string(),
273                source: "h264_splitmuxsink".to_string(),
274                timestamp: "99:99:99.999999999".to_string(),
275                seqnum: 7535,
276                message: gstd_types::GstSplitMuxSinkFragmentMessage {
277                    location:  "/home/printnanny/.local/share/printnanny/video/88e1cc18-962f-4ec4-baeb-656b752520e5/00004.mp4".to_string(),
278                    running_time: 1020189067077,
279                    sink: "(GstFileSink) sink".to_string()
280                }
281            },
282        );
283
284        assert_eq!(res.response, expected);
285    }
286
287    #[ignore]
288    #[test]
289    fn create_client_with_build() {
290        let client = GstClient::build(BASE_URL).unwrap();
291        assert_eq!(client.base_url, expect_url());
292
293        let client = GstClient::build(BASE_URL.to_string()).unwrap();
294        assert_eq!(client.base_url, expect_url());
295    }
296
297    #[ignore]
298    #[test]
299    fn create_client_from() {
300        let url = expect_url();
301        let client = GstClient::from(&url);
302        assert_eq!(client.base_url, expect_url());
303
304        let client = GstClient::from(url);
305        assert_eq!(client.base_url, expect_url());
306    }
307
308    #[ignore]
309    #[tokio::test]
310    async fn create_pipeline() {
311        if let Ok(client) = GstClient::build(BASE_URL) {
312            let res = client.pipeline(PIPELINE_NAME).create("").await;
313            println!("{:?}", res);
314            assert!(res.is_ok());
315        };
316    }
317
318    #[ignore]
319    #[tokio::test]
320    async fn retrieve_pipelines() {
321        if let Ok(client) = GstClient::build(BASE_URL) {
322            let res = client.pipelines().await;
323            println!("{:?}", res);
324            assert!(res.is_ok());
325        };
326    }
327
328    #[ignore]
329    #[tokio::test]
330    async fn retrieve_pipeline_graph() {
331        if let Ok(client) = GstClient::build(BASE_URL) {
332            let res = client.pipeline(PIPELINE_NAME).graph().await;
333            println!("{:?}", res);
334            assert!(res.is_ok());
335        };
336    }
337
338    #[ignore]
339    #[tokio::test]
340    async fn retrieve_pipeline_elements() {
341        if let Ok(client) = GstClient::build(BASE_URL) {
342            let res = client.pipeline(PIPELINE_NAME).elements().await;
343            println!("{:?}", res);
344            assert!(res.is_ok());
345        };
346    }
347
348    #[ignore]
349    #[tokio::test]
350    async fn retrieve_pipeline_properties() {
351        if let Ok(client) = GstClient::build(BASE_URL) {
352            let res = client.pipeline(PIPELINE_NAME).properties().await;
353            println!("{:?}", res);
354            assert!(res.is_ok());
355        };
356    }
357
358    #[ignore]
359    #[tokio::test]
360    async fn retrieve_pipeline_element_property() {
361        if let Ok(client) = GstClient::build(BASE_URL) {
362            let res = client
363                .pipeline(PIPELINE_NAME)
364                .element("rtmp2src")
365                .property("location")
366                .await;
367            println!("{:?}", res);
368            assert!(res.is_ok());
369        };
370    }
371
372    #[ignore]
373    #[tokio::test]
374    async fn retrieve_pipeline_bus_read() {
375        if let Ok(client) = GstClient::build(BASE_URL) {
376            let res = client.pipeline(PIPELINE_NAME).bus().read().await;
377            println!("{:?}", res);
378            assert!(res.is_ok());
379        };
380    }
381}