Skip to main content

datex_native/com_interfaces/http/
http_client.rs

1use datex_core::channel::mpsc::create_unbounded_channel;
2use datex_core::derive_setup_data;
3use datex_core::network::com_hub::errors::ComInterfaceCreateError;
4use datex_core::network::com_hub::managers::com_interface_manager::ComInterfaceAsyncFactoryResult;
5use datex_core::network::com_interfaces::com_interface::factory::{ComInterfaceAsyncFactory, ComInterfaceConfiguration, SendCallback, SendFailure, SocketConfiguration, SocketProperties};
6use datex_core::network::com_interfaces::com_interface::properties::{ComInterfaceProperties, InterfaceDirection};
7use datex_core::network::com_interfaces::default_setup_data::http::http_client::HTTPClientInterfaceSetupData;
8
9derive_setup_data!(HTTPClientInterfaceSetupDataNative, HTTPClientInterfaceSetupData);
10
11impl HTTPClientInterfaceSetupDataNative {
12    async fn create_interface(self) -> Result<ComInterfaceConfiguration, ComInterfaceCreateError> {
13
14        let (response_sender, mut response_receiver) = create_unbounded_channel::<Vec<u8>>();
15
16        Ok(ComInterfaceConfiguration::new_single_socket(
17            ComInterfaceProperties {
18                name: Some(self.url.clone()),
19                ..Self::get_default_properties()
20            },
21            SocketConfiguration::new_in_out(
22                SocketProperties::new(InterfaceDirection::InOut, 1),
23                async gen move {
24                    while let Some(response_data) = response_receiver.next().await {
25                        yield Ok(response_data);
26                    }
27                },
28                SendCallback::new_async(move |block| {
29                    let url = self.url.clone();
30                    let mut response_sender = response_sender.clone();
31                    async move {
32                        let client = reqwest::Client::new();
33                        let response = client.post(&url)
34                            .body(block.to_bytes())
35                            .send()
36                            .await
37                            .map_err(|e| {
38                                println!("HTTP request error: {:#?}", e);
39                                SendFailure(Box::new(block.clone()))
40                            })?;
41                        let status = response.status();
42                        let bytes = response.bytes().await
43                            .map_err(|e| {
44                                println!("HTTP response read error: {:#?}", e);
45                                SendFailure(Box::new(block.clone()))
46                            })?;
47                        response_sender.start_send(bytes.to_vec())
48                            .unwrap();
49
50                        if status.is_success() {
51                            Ok(())
52                        } else {
53                            println!("HTTP request failed with status: {}", status);
54                            Err(SendFailure(Box::new(block)))
55                        }
56                    }
57                })
58            )
59        ))
60    }
61}
62
63impl ComInterfaceAsyncFactory for HTTPClientInterfaceSetupDataNative {
64    fn create_interface(self) -> ComInterfaceAsyncFactoryResult {
65        Box::pin(self.create_interface())
66    }
67
68    fn get_default_properties() -> ComInterfaceProperties {
69        HTTPClientInterfaceSetupData::get_default_properties()
70    }
71}