ouch_connect/
clt.rs

1use links_bindings_python::prelude::*;
2use links_nonblocking::prelude::*;
3use log::info;
4use ouch_connect_nonblocking::prelude::*;
5use pyo3::{prelude::*, types::PyDict};
6use serde::Serialize;
7use std::time::Duration;
8
9use crate::{DEFAULT_CONNECT_TIMEOUT, DEFAULT_IO_TIMEOUT, DEFAULT_MAX_HBEAT_INTERVAL, DEFAULT_RETRY_CONNECT_AFTER, DEFAULT_USR_PWD};
10
11create_callback_for_messenger!(CltOuchProtocolManual, CltOuchProtocolManualCallback);
12create_clt_sender!(CltManual, CltOuchSender, CltOuchProtocolManual, CltOuchProtocolManualCallback, "ouch_connect");
13
14#[derive(Serialize)]
15struct CltManualConfig {
16    pub connect_timeout: f64,
17    pub retry_connect_after: f64,
18    pub io_timeout: f64,
19    pub name: String,
20}
21impl Default for CltManualConfig {
22    fn default() -> Self {
23        Self {
24            connect_timeout: DEFAULT_CONNECT_TIMEOUT,
25            retry_connect_after: DEFAULT_RETRY_CONNECT_AFTER,
26            io_timeout: DEFAULT_IO_TIMEOUT,
27            name: asserted_short_name!("CltManual", CltManual).to_owned(),
28        }
29    }
30}
31impl From<&PyDict> for CltManualConfig {
32    fn from(py_dict: &PyDict) -> Self {
33        let default = Self::default();
34        let connect_timeout = py_dict.get_item("connect_timeout").unwrap().map_or(default.connect_timeout, |any| any.extract::<f64>().unwrap());
35        let retry_connect_after = py_dict.get_item("retry_connect_after").unwrap().map_or(default.retry_connect_after, |any| any.extract::<f64>().unwrap());
36        let io_timeout = py_dict.get_item("io_timeout").unwrap().map_or(default.io_timeout, |any| any.extract::<f64>().unwrap());
37        let name = py_dict.get_item("name").unwrap().map_or(default.name, |any| any.extract::<String>().unwrap());
38        Self {
39            connect_timeout,
40            retry_connect_after,
41            io_timeout,
42            name,
43        }
44    }
45}
46
47#[pymethods]
48impl CltManual {
49    #[new]
50    #[pyo3(signature = (host, callback, **kwargs))]
51    fn new(_py: Python<'_>, host: &str, callback: PyObject, kwargs: Option<&PyDict>) -> PyResult<Py<Self>> {
52        let config = kwargs.map_or(CltManualConfig::default(), CltManualConfig::from);
53        info!("{}: effective config: {} with kwargs: {:?}", asserted_short_name!("CltManual", Self), serde_json::to_string(&config).unwrap(), kwargs);
54        let sender = {
55            let callback = callback.clone();
56            let sender = _py
57                .allow_threads(move || {
58                    let callback = CltOuchProtocolManualCallback::new_ref(callback);
59                    let protocol = CltOuchProtocolManual::default();
60                    CltOuch::connect(
61                        host,
62                        Duration::from_secs_f64(config.connect_timeout),
63                        Duration::from_secs_f64(config.retry_connect_after),
64                        callback,
65                        protocol,
66                        Some(config.name.as_str()),
67                    )
68                })?
69                .into_sender_with_spawned_recver();
70            let con_id = sender.con_id().clone();
71            Py::new(
72                _py,
73                Self {
74                    sender: Some(sender).into(),
75                    con_id,
76                    io_timeout: Some(config.io_timeout),
77                },
78            )?
79        };
80        patch_callback_if_settable_sender!(_py, sender, callback, asserted_short_name!("CltManual", Self));
81        Ok(sender)
82    }
83    #[classattr]
84    fn msg_samples() -> Vec<String> {
85        ouch_connect_nonblocking::prelude::clt_ouch_default_msgs().iter().map(|m| serde_json::to_string(m).unwrap()).collect::<Vec<_>>()
86    }
87}
88
89create_callback_for_messenger!(CltOuchProtocolAuto, CltOuchProtocolAutoCallback);
90create_clt_sender!(CltAuto, CltOuchSenderRef, CltOuchProtocolAuto, CltOuchProtocolAutoCallback, "ouch_connect");
91
92#[derive(Serialize)]
93struct CltAutoConfig {
94    pub username: String,
95    pub password: String,
96    pub session_id: String,
97    pub sequence: usize,
98    pub clt_max_hbeat_interval: f64,
99    pub svc_max_hbeat_interval: f64,
100    pub connect_timeout: f64,
101    pub retry_connect_after: f64,
102    pub io_timeout: f64,
103    pub name: String,
104}
105impl Default for CltAutoConfig {
106    fn default() -> Self {
107        Self {
108            username: DEFAULT_USR_PWD.to_owned(),
109            password: DEFAULT_USR_PWD.to_owned(),
110            session_id: "".to_owned(),
111            sequence: 0,
112            clt_max_hbeat_interval: DEFAULT_MAX_HBEAT_INTERVAL,
113            svc_max_hbeat_interval: DEFAULT_MAX_HBEAT_INTERVAL,
114            connect_timeout: DEFAULT_CONNECT_TIMEOUT,
115            retry_connect_after: DEFAULT_RETRY_CONNECT_AFTER,
116            io_timeout: DEFAULT_IO_TIMEOUT,
117            name: asserted_short_name!("CltAuto", CltAuto).to_owned(),
118        }
119    }
120}
121impl From<&PyDict> for CltAutoConfig {
122    fn from(value: &PyDict) -> Self {
123        let default = Self::default();
124        let username = value.get_item("username").unwrap().map_or(default.username, |any| any.extract::<String>().unwrap());
125        let password = value.get_item("password").unwrap().map_or(default.password, |any| any.extract::<String>().unwrap());
126        let session_id = value.get_item("session_id").unwrap().map_or(default.session_id, |any| any.extract::<String>().unwrap());
127        let sequence = value.get_item("sequence").unwrap().map_or(default.sequence, |any| any.extract::<usize>().unwrap());
128        let clt_max_hbeat_interval = value.get_item("clt_max_hbeat_interval").unwrap().map_or(default.clt_max_hbeat_interval, |any| any.extract::<f64>().unwrap());
129        let svc_max_hbeat_interval = value.get_item("svc_max_hbeat_interval").unwrap().map_or(default.svc_max_hbeat_interval, |any| any.extract::<f64>().unwrap());
130        let connect_timeout = value.get_item("connect_timeout").unwrap().map_or(default.connect_timeout, |any| any.extract::<f64>().unwrap());
131        let retry_connect_after = value.get_item("retry_connect_after").unwrap().map_or(default.retry_connect_after, |any| any.extract::<f64>().unwrap());
132        let io_timeout = value.get_item("io_timeout").unwrap().map_or(default.io_timeout, |any| any.extract::<f64>().unwrap());
133        let name = value.get_item("name").unwrap().map_or(default.name, |any| any.extract::<String>().unwrap());
134        Self {
135            username,
136            password,
137            session_id,
138            sequence,
139            clt_max_hbeat_interval,
140            svc_max_hbeat_interval,
141            connect_timeout,
142            retry_connect_after,
143            io_timeout,
144            name,
145        }
146    }
147}
148
149#[pymethods]
150impl CltAuto {
151    #[new]
152    #[pyo3(signature = (host, callback, **kwargs))]
153    fn new(_py: Python<'_>, host: &str, callback: PyObject, kwargs: Option<&PyDict>) -> PyResult<Py<Self>> {
154        let config = kwargs.map_or(CltAutoConfig::default(), CltAutoConfig::from);
155        info!("{}: effective config: {} with kwargs: {:?}", asserted_short_name!("CltAuto", Self), serde_json::to_string(&config).unwrap(), kwargs);
156
157        let sender = {
158            let callback = CltOuchProtocolAutoCallback::new_ref(callback.clone());
159
160            let protocol = CltOuchProtocolAuto::new(
161                UserName::from(config.username.as_str()),
162                Password::from(config.password.as_str()),
163                SessionId::from(config.session_id.as_str()),
164                SequenceNumber::from(config.sequence),
165                Duration::from_secs_f64(config.io_timeout),
166                Duration::from_secs_f64(config.clt_max_hbeat_interval),
167                Duration::from_secs_f64(config.svc_max_hbeat_interval),
168            );
169
170            let sender = _py
171                .allow_threads(move || {
172                    CltOuch::connect(
173                        host,
174                        Duration::from_secs_f64(config.connect_timeout),
175                        Duration::from_secs_f64(config.retry_connect_after),
176                        callback,
177                        protocol,
178                        Some(config.name.as_str()),
179                    )
180                })?
181                .into_sender_with_spawned_recver_ref();
182            let con_id = sender.con_id().clone();
183            Py::new(
184                _py,
185                Self {
186                    sender: Some(sender).into(),
187                    con_id,
188                    io_timeout: Some(config.io_timeout),
189                },
190            )?
191        };
192
193        patch_callback_if_settable_sender!(_py, sender, callback, asserted_short_name!("CltAuto", Self));
194        Ok(sender)
195    }
196    #[classattr]
197    fn msg_samples() -> Vec<String> {
198        ouch_connect_nonblocking::prelude::clt_ouch_default_msgs().iter().map(|m| serde_json::to_string(m).unwrap()).collect::<Vec<_>>()
199    }
200}