1use solace_rs_sys as ffi;
2use std::{
3 ffi::{CString, NulError},
4 marker::PhantomData,
5 mem, ptr,
6};
7
8use crate::{
9 message::InboundMessage,
10 session::SessionEvent,
11 util::{
12 get_last_error_info, on_event_trampoline, on_message_trampoline, static_no_op_on_event,
13 static_no_op_on_message,
14 },
15 Context, Session, SolClientReturnCode, SolClientSubCode,
16};
17
18#[derive(thiserror::Error, Debug)]
19pub enum SessionBuilderError {
20 #[error("session failed to initialize. SolClient return code: {0} subcode: {1}")]
21 InitializationFailure(SolClientReturnCode, SolClientSubCode),
22 #[error("session failed to connect. SolClient return code: {0} subcode: {1}")]
23 ConnectionFailure(SolClientReturnCode, SolClientSubCode),
24 #[error("arg contains interior nul byte")]
25 InvalidArgs(#[from] NulError),
26 #[error("{0} arg need to be set")]
27 MissingRequiredArgs(String),
28 #[error("{0} valid range is {1} foound {2}")]
29 InvalidRange(String, String, String),
30}
31
32type Result<T> = std::result::Result<T, SessionBuilderError>;
33
34fn bool_to_ptr(b: bool) -> *const i8 {
35 if b {
36 ffi::SOLCLIENT_PROP_ENABLE_VAL.as_ptr() as *const i8
37 } else {
38 ffi::SOLCLIENT_PROP_DISABLE_VAL.as_ptr() as *const i8
39 }
40}
41
42struct UncheckedSessionProps<Host, Vpn, Username, Password> {
43 host_name: Option<Host>,
46 vpn_name: Option<Vpn>,
47 username: Option<Username>,
48 password: Option<Password>,
49
50 buffer_size_bytes: Option<u64>,
52 block_write_timeout_ms: Option<u64>,
53 connect_timeout_ms: Option<u64>,
54 subconfirm_timeout_ms: Option<u64>,
55 ignore_dup_subscription_error: Option<bool>,
56 tcp_nodelay: Option<bool>,
57 socket_send_buf_size_bytes: Option<u64>,
58 socket_rcv_buf_size_bytes: Option<u64>,
59 keep_alive_interval_ms: Option<u64>,
60 keep_alive_limit: Option<u64>,
61 application_description: Option<Vec<u8>>,
62 client_name: Option<Vec<u8>>,
63 compression_level: Option<u8>,
64 generate_rcv_timestamps: Option<bool>,
65 generate_send_timestamp: Option<bool>,
66 generate_sender_id: Option<bool>,
67 generate_sender_sequence_number: Option<bool>,
68 connect_retries_per_host: Option<i64>,
69 connect_retries: Option<i64>,
70 reconnect_retries: Option<i64>,
71 reconnect_retry_wait_ms: Option<u64>,
72 reapply_subscriptions: Option<bool>,
73 provision_timeout_ms: Option<u64>,
74 calculate_message_expiration: Option<bool>,
75 no_local: Option<bool>,
76 modifyprop_timeout_ms: Option<u64>,
77
78 #[allow(dead_code)]
85 send_blocking: Option<bool>,
86 #[allow(dead_code)]
87 subscribe_blocking: Option<bool>,
88 #[allow(dead_code)]
89 block_while_connecting: Option<bool>,
90
91 #[allow(dead_code)]
94 topic_dispatch: Option<bool>,
95}
96
97impl<Host, Vpn, Username, Password> Default
98 for UncheckedSessionProps<Host, Vpn, Username, Password>
99{
100 fn default() -> Self {
101 Self {
102 host_name: None,
103 vpn_name: None,
104 username: None,
105 password: None,
106 buffer_size_bytes: None,
107 block_write_timeout_ms: None,
108 connect_timeout_ms: None,
109 subconfirm_timeout_ms: None,
110 ignore_dup_subscription_error: None,
111 tcp_nodelay: None,
112 socket_send_buf_size_bytes: None,
113 socket_rcv_buf_size_bytes: None,
114 keep_alive_interval_ms: None,
115 keep_alive_limit: None,
116 application_description: None,
117 client_name: None,
118 compression_level: None,
119 generate_rcv_timestamps: None,
120 generate_send_timestamp: None,
121 generate_sender_id: None,
122 generate_sender_sequence_number: None,
123 connect_retries_per_host: None,
124 connect_retries: None,
125 reconnect_retries: None,
126 reconnect_retry_wait_ms: None,
127 reapply_subscriptions: None,
128 provision_timeout_ms: None,
129 calculate_message_expiration: None,
130 no_local: None,
131 modifyprop_timeout_ms: None,
132 send_blocking: None,
133 subscribe_blocking: None,
134 block_while_connecting: None,
135 topic_dispatch: None,
136 }
137 }
138}
139
140pub struct SessionBuilder<Host, Vpn, Username, Password, OnMessage, OnEvent> {
145 context: Context,
146 props: UncheckedSessionProps<Host, Vpn, Username, Password>,
147
148 on_message: Option<OnMessage>,
150 on_event: Option<OnEvent>,
151}
152
153impl<Host, Vpn, Username, Password, OnMessage, OnEvent>
154 SessionBuilder<Host, Vpn, Username, Password, OnMessage, OnEvent>
155{
156 pub(crate) fn new(context: Context) -> Self {
157 Self {
158 context,
159 props: UncheckedSessionProps::default(),
160 on_message: None,
161 on_event: None,
162 }
163 }
164}
165
166impl<'session, Host, Vpn, Username, Password, OnMessage, OnEvent>
167 SessionBuilder<Host, Vpn, Username, Password, OnMessage, OnEvent>
168where
169 Host: Into<Vec<u8>>,
170 Vpn: Into<Vec<u8>>,
171 Username: Into<Vec<u8>>,
172 Password: Into<Vec<u8>>,
173 OnMessage: FnMut(InboundMessage) + Send + 'session,
174 OnEvent: FnMut(SessionEvent) + Send + 'session,
175{
176 pub fn build(mut self) -> Result<Session<'session, OnMessage, OnEvent>> {
177 let config = CheckedSessionProps::try_from(mem::take(&mut self.props))?;
178
179 let mut session_pt: ffi::solClient_opaqueSession_pt = ptr::null_mut();
185
186 let (static_on_message_callback, user_on_message, msg_func_ptr) = match self.on_message {
193 Some(f) => {
194 let tramp = on_message_trampoline(&f);
195 let mut func = Box::new(Box::new(f));
196 (tramp, func.as_mut() as *const _ as *mut _, Some(func))
197 }
198 _ => (
199 Some(static_no_op_on_message as unsafe extern "C" fn(_, _, _) -> u32),
200 ptr::null_mut(),
201 None,
202 ),
203 };
204
205 let (static_on_event_callback, user_on_event, event_func_ptr) = match self.on_event {
206 Some(f) => {
207 let tramp = on_event_trampoline(&f);
208 let mut func = Box::new(Box::new(f));
209 (tramp, func.as_mut() as *const _ as *mut _, Some(func))
210 }
211 _ => (
212 Some(static_no_op_on_event as unsafe extern "C" fn(_, _, _)),
213 ptr::null_mut(),
214 None,
215 ),
216 };
217
218 let mut session_func_info: ffi::solClient_session_createFuncInfo_t =
221 ffi::solClient_session_createFuncInfo {
222 rxInfo: ffi::solClient_session_createRxCallbackFuncInfo {
223 callback_p: ptr::null_mut(),
224 user_p: ptr::null_mut(),
225 },
226 eventInfo: ffi::solClient_session_createEventCallbackFuncInfo {
227 callback_p: static_on_event_callback,
228 user_p: user_on_event,
229 },
230 rxMsgInfo: ffi::solClient_session_createRxMsgCallbackFuncInfo {
231 callback_p: static_on_message_callback,
232 user_p: user_on_message,
233 },
234 };
235
236 let mut raw = config.to_raw();
237 let context_ptr = self.context.raw.lock().unwrap();
238 let session_create_raw_rc = unsafe {
239 ffi::solClient_session_create(
240 raw.as_mut_ptr(),
241 context_ptr.ctx,
242 &mut session_pt,
243 &mut session_func_info,
244 std::mem::size_of::<ffi::solClient_session_createFuncInfo_t>(),
245 )
246 };
247 drop(context_ptr);
248
249 let rc = SolClientReturnCode::from_raw(session_create_raw_rc);
250
251 if !rc.is_ok() {
252 let subcode = get_last_error_info();
253 return Err(SessionBuilderError::InitializationFailure(rc, subcode));
254 }
255
256 let connection_raw_rc = unsafe { ffi::solClient_session_connect(session_pt) };
257
258 let rc = SolClientReturnCode::from_raw(connection_raw_rc);
259 if rc.is_ok() {
260 Ok(Session {
261 _msg_fn_ptr: msg_func_ptr,
262 _event_fn_ptr: event_func_ptr,
263 _session_ptr: session_pt,
264 context: self.context,
265 lifetime: PhantomData,
266 })
267 } else {
268 let subcode = get_last_error_info();
269 Err(SessionBuilderError::ConnectionFailure(rc, subcode))
270 }
271 }
272
273 pub fn host_name(mut self, host_name: Host) -> Self {
274 self.props.host_name = Some(host_name);
275 self
276 }
277
278 pub fn vpn_name(mut self, vpn_name: Vpn) -> Self {
279 self.props.vpn_name = Some(vpn_name);
280 self
281 }
282 pub fn username(mut self, username: Username) -> Self {
283 self.props.username = Some(username);
284 self
285 }
286 pub fn password(mut self, password: Password) -> Self {
287 self.props.password = Some(password);
288 self
289 }
290
291 pub fn on_message(mut self, on_message: OnMessage) -> Self {
292 self.on_message = Some(on_message);
293 self
294 }
295
296 pub fn on_event(mut self, on_event: OnEvent) -> Self {
297 self.on_event = Some(on_event);
298 self
299 }
300
301 pub fn buffer_size_bytes(mut self, buffer_size_bytes: u64) -> Self {
302 self.props.buffer_size_bytes = Some(buffer_size_bytes);
303 self
304 }
305 pub fn block_write_timeout_ms(mut self, write_timeout_ms: u64) -> Self {
306 self.props.block_write_timeout_ms = Some(write_timeout_ms);
307 self
308 }
309 pub fn connect_timeout_ms(mut self, connect_timeout_ms: u64) -> Self {
310 self.props.connect_timeout_ms = Some(connect_timeout_ms);
311 self
312 }
313 pub fn subconfirm_timeout_ms(mut self, subconfirm_timeout_ms: u64) -> Self {
314 self.props.subconfirm_timeout_ms = Some(subconfirm_timeout_ms);
315 self
316 }
317 pub fn ignore_dup_subscription_error(mut self, ignore_dup_subscription_error: bool) -> Self {
318 self.props.ignore_dup_subscription_error = Some(ignore_dup_subscription_error);
319 self
320 }
321 pub fn tcp_nodelay(mut self, tcp_nodelay: bool) -> Self {
322 self.props.tcp_nodelay = Some(tcp_nodelay);
323 self
324 }
325 pub fn socket_send_buf_size_bytes(mut self, socket_send_buf_size_bytes: u64) -> Self {
326 self.props.socket_send_buf_size_bytes = Some(socket_send_buf_size_bytes);
327 self
328 }
329 pub fn socket_rcv_buf_size_bytes(mut self, socket_rcv_buf_size_bytes: u64) -> Self {
330 self.props.socket_rcv_buf_size_bytes = Some(socket_rcv_buf_size_bytes);
331 self
332 }
333 pub fn keep_alive_interval_ms(mut self, keep_alive_interval_ms: u64) -> Self {
334 self.props.keep_alive_interval_ms = Some(keep_alive_interval_ms);
335 self
336 }
337 pub fn keep_alive_limit(mut self, keep_alive_limit: u64) -> Self {
338 self.props.keep_alive_limit = Some(keep_alive_limit);
339 self
340 }
341 pub fn application_description<AppDescription: Into<Vec<u8>>>(
342 mut self,
343 application_description: AppDescription,
344 ) -> Self {
345 self.props.application_description = Some(application_description.into());
346 self
347 }
348 pub fn client_name<ClientName: Into<Vec<u8>>>(mut self, client_name: ClientName) -> Self {
349 self.props.client_name = Some(client_name.into());
350 self
351 }
352 pub fn compression_level(mut self, compression_level: u8) -> Self {
353 self.props.compression_level = Some(compression_level);
354 self
355 }
356 pub fn generate_rcv_timestamps(mut self, generate_rcv_timestamps: bool) -> Self {
357 self.props.generate_rcv_timestamps = Some(generate_rcv_timestamps);
358 self
359 }
360 pub fn generate_send_timestamp(mut self, generate_send_timestamp: bool) -> Self {
361 self.props.generate_send_timestamp = Some(generate_send_timestamp);
362 self
363 }
364 pub fn generate_sender_id(mut self, generate_sender_id: bool) -> Self {
365 self.props.generate_sender_id = Some(generate_sender_id);
366 self
367 }
368 pub fn generate_sender_sequence_number(
369 mut self,
370 generate_sender_sequence_number: bool,
371 ) -> Self {
372 self.props.generate_sender_sequence_number = Some(generate_sender_sequence_number);
373 self
374 }
375 pub fn connect_retries_per_host(mut self, connect_retries_per_host: i64) -> Self {
376 self.props.connect_retries_per_host = Some(connect_retries_per_host);
377 self
378 }
379 pub fn connect_retries(mut self, connect_retries: i64) -> Self {
380 self.props.connect_retries = Some(connect_retries);
381 self
382 }
383 pub fn reconnect_retries(mut self, reconnect_retries: i64) -> Self {
384 self.props.reconnect_retries = Some(reconnect_retries);
385 self
386 }
387 pub fn reconnect_retry_wait_ms(mut self, reconnect_retry_wait_ms: u64) -> Self {
388 self.props.reconnect_retry_wait_ms = Some(reconnect_retry_wait_ms);
389 self
390 }
391 pub fn reapply_subscriptions(mut self, reapply_subscriptions: bool) -> Self {
392 self.props.reapply_subscriptions = Some(reapply_subscriptions);
393 self
394 }
395 pub fn provision_timeout_ms(mut self, provision_timeout_ms: u64) -> Self {
396 self.props.provision_timeout_ms = Some(provision_timeout_ms);
397 self
398 }
399 pub fn calculate_message_expiration(mut self, calculate_message_expiration: bool) -> Self {
400 self.props.calculate_message_expiration = Some(calculate_message_expiration);
401 self
402 }
403 pub fn no_local(mut self, no_local: bool) -> Self {
404 self.props.no_local = Some(no_local);
405 self
406 }
407 pub fn modifyprop_timeout_ms(mut self, modifyprop_timeout_ms: u64) -> Self {
408 self.props.modifyprop_timeout_ms = Some(modifyprop_timeout_ms);
409 self
410 }
411}
412
413struct CheckedSessionProps {
414 host_name: CString,
415 vpn_name: CString,
416 username: CString,
417 password: CString,
418
419 buffer_size_bytes: Option<CString>,
421 block_write_timeout_ms: Option<CString>,
422 connect_timeout_ms: Option<CString>,
423 subconfirm_timeout_ms: Option<CString>,
424 ignore_dup_subscription_error: Option<bool>,
425 tcp_nodelay: Option<bool>,
426 socket_send_buf_size_bytes: Option<CString>,
427 socket_rcv_buf_size_bytes: Option<CString>,
428 keep_alive_interval_ms: Option<CString>,
429 keep_alive_limit: Option<CString>,
430 application_description: Option<CString>,
431 client_name: Option<CString>,
432 compression_level: Option<CString>,
433 generate_rcv_timestamps: Option<bool>,
434 generate_send_timestamp: Option<bool>,
435 generate_sender_id: Option<bool>,
436 generate_sender_sequence_number: Option<bool>,
437 connect_retries_per_host: Option<CString>,
438 connect_retries: Option<CString>,
439 reconnect_retries: Option<CString>,
440 reconnect_retry_wait_ms: Option<CString>,
441 reapply_subscriptions: Option<bool>,
442 provision_timeout_ms: Option<CString>,
443 calculate_message_expiration: Option<bool>,
444 no_local: Option<bool>,
445 modifyprop_timeout_ms: Option<CString>,
446}
447
448impl CheckedSessionProps {
449 fn to_raw(&self) -> Vec<*const i8> {
450 let mut props = vec![
451 ffi::SOLCLIENT_SESSION_PROP_HOST.as_ptr() as *const i8,
452 self.host_name.as_ptr(),
453 ffi::SOLCLIENT_SESSION_PROP_VPN_NAME.as_ptr() as *const i8,
454 self.vpn_name.as_ptr(),
455 ffi::SOLCLIENT_SESSION_PROP_USERNAME.as_ptr() as *const i8,
456 self.username.as_ptr(),
457 ffi::SOLCLIENT_SESSION_PROP_PASSWORD.as_ptr() as *const i8,
458 self.password.as_ptr(),
459 ffi::SOLCLIENT_SESSION_PROP_CONNECT_BLOCKING.as_ptr() as *const i8,
460 ffi::SOLCLIENT_PROP_ENABLE_VAL.as_ptr() as *const i8,
461 ];
462
463 if let Some(x) = &self.buffer_size_bytes {
464 props.push(ffi::SOLCLIENT_SESSION_PROP_BUFFER_SIZE.as_ptr() as *const i8);
465 props.push(x.as_ptr());
466 }
467
468 if let Some(x) = &self.block_write_timeout_ms {
469 props.push(ffi::SOLCLIENT_SESSION_PROP_BLOCKING_WRITE_TIMEOUT_MS.as_ptr() as *const i8);
470 props.push(x.as_ptr());
471 }
472 if let Some(x) = &self.connect_timeout_ms {
473 props.push(ffi::SOLCLIENT_SESSION_PROP_CONNECT_TIMEOUT_MS.as_ptr() as *const i8);
474 props.push(x.as_ptr());
475 }
476
477 if let Some(x) = &self.subconfirm_timeout_ms {
478 props.push(ffi::SOLCLIENT_SESSION_PROP_SUBCONFIRM_TIMEOUT_MS.as_ptr() as *const i8);
479 props.push(x.as_ptr());
480 }
481 if let Some(x) = &self.ignore_dup_subscription_error {
482 props.push(
483 ffi::SOLCLIENT_SESSION_PROP_IGNORE_DUP_SUBSCRIPTION_ERROR.as_ptr() as *const i8,
484 );
485 props.push(bool_to_ptr(*x));
486 }
487
488 if let Some(x) = &self.tcp_nodelay {
489 props.push(ffi::SOLCLIENT_SESSION_PROP_TCP_NODELAY.as_ptr() as *const i8);
490 props.push(bool_to_ptr(*x));
491 }
492 if let Some(x) = &self.socket_send_buf_size_bytes {
493 props.push(ffi::SOLCLIENT_SESSION_PROP_SOCKET_SEND_BUF_SIZE.as_ptr() as *const i8);
494 props.push(x.as_ptr());
495 }
496
497 if let Some(x) = &self.socket_rcv_buf_size_bytes {
498 props.push(ffi::SOLCLIENT_SESSION_PROP_SOCKET_RCV_BUF_SIZE.as_ptr() as *const i8);
499 props.push(x.as_ptr());
500 }
501 if let Some(x) = &self.keep_alive_interval_ms {
502 props.push(ffi::SOLCLIENT_SESSION_PROP_KEEP_ALIVE_INT_MS.as_ptr() as *const i8);
503 props.push(x.as_ptr());
504 }
505 if let Some(x) = &self.keep_alive_limit {
506 props.push(ffi::SOLCLIENT_SESSION_PROP_KEEP_ALIVE_LIMIT.as_ptr() as *const i8);
507 props.push(x.as_ptr());
508 }
509 if let Some(x) = &self.application_description {
510 props.push(ffi::SOLCLIENT_SESSION_PROP_APPLICATION_DESCRIPTION.as_ptr() as *const i8);
511 props.push(x.as_ptr());
512 }
513 if let Some(x) = &self.client_name {
514 props.push(ffi::SOLCLIENT_SESSION_PROP_CLIENT_NAME.as_ptr() as *const i8);
515 props.push(x.as_ptr());
516 }
517
518 if let Some(x) = &self.compression_level {
519 props.push(ffi::SOLCLIENT_SESSION_PROP_COMPRESSION_LEVEL.as_ptr() as *const i8);
520 props.push(x.as_ptr());
521 }
522 if let Some(x) = &self.generate_rcv_timestamps {
523 props.push(ffi::SOLCLIENT_SESSION_PROP_GENERATE_RCV_TIMESTAMPS.as_ptr() as *const i8);
524 props.push(bool_to_ptr(*x));
525 }
526 if let Some(x) = &self.generate_send_timestamp {
527 props.push(ffi::SOLCLIENT_SESSION_PROP_GENERATE_SEND_TIMESTAMPS.as_ptr() as *const i8);
528 props.push(bool_to_ptr(*x));
529 }
530 if let Some(x) = &self.generate_sender_id {
531 props.push(ffi::SOLCLIENT_SESSION_PROP_GENERATE_SENDER_ID.as_ptr() as *const i8);
532 props.push(bool_to_ptr(*x));
533 }
534 if let Some(x) = &self.generate_sender_sequence_number {
535 props.push(ffi::SOLCLIENT_SESSION_PROP_GENERATE_SEQUENCE_NUMBER.as_ptr() as *const i8);
536 props.push(bool_to_ptr(*x));
537 }
538 if let Some(x) = &self.connect_retries_per_host {
539 props.push(ffi::SOLCLIENT_SESSION_PROP_CONNECT_RETRIES_PER_HOST.as_ptr() as *const i8);
540 props.push(x.as_ptr());
541 }
542 if let Some(x) = &self.connect_retries {
543 props.push(ffi::SOLCLIENT_SESSION_PROP_CONNECT_RETRIES.as_ptr() as *const i8);
544 props.push(x.as_ptr());
545 }
546 if let Some(x) = &self.reconnect_retries {
547 props.push(ffi::SOLCLIENT_SESSION_PROP_RECONNECT_RETRIES.as_ptr() as *const i8);
548 props.push(x.as_ptr());
549 }
550 if let Some(x) = &self.reconnect_retry_wait_ms {
551 props.push(ffi::SOLCLIENT_SESSION_PROP_RECONNECT_RETRY_WAIT_MS.as_ptr() as *const i8);
552 props.push(x.as_ptr());
553 }
554 if let Some(x) = &self.reapply_subscriptions {
555 props.push(ffi::SOLCLIENT_SESSION_PROP_REAPPLY_SUBSCRIPTIONS.as_ptr() as *const i8);
556 props.push(bool_to_ptr(*x));
557 }
558 if let Some(x) = &self.provision_timeout_ms {
559 props.push(ffi::SOLCLIENT_SESSION_PROP_PROVISION_TIMEOUT_MS.as_ptr() as *const i8);
560 props.push(x.as_ptr());
561 }
562 if let Some(x) = &self.calculate_message_expiration {
563 props.push(
564 ffi::SOLCLIENT_SESSION_PROP_CALCULATE_MESSAGE_EXPIRATION.as_ptr() as *const i8,
565 );
566 props.push(bool_to_ptr(*x));
567 }
568 if let Some(x) = &self.no_local {
569 props.push(ffi::SOLCLIENT_SESSION_PROP_NO_LOCAL.as_ptr() as *const i8);
570 props.push(bool_to_ptr(*x));
571 }
572 if let Some(x) = &self.modifyprop_timeout_ms {
573 props.push(ffi::SOLCLIENT_SESSION_PROP_MODIFYPROP_TIMEOUT_MS.as_ptr() as *const i8);
574 props.push(x.as_ptr());
575 }
576
577 props.push(ptr::null());
578
579 props
580 }
581}
582
583impl<Host, Vpn, Username, Password> TryFrom<UncheckedSessionProps<Host, Vpn, Username, Password>>
584 for CheckedSessionProps
585where
586 Host: Into<Vec<u8>>,
587 Vpn: Into<Vec<u8>>,
588 Username: Into<Vec<u8>>,
589 Password: Into<Vec<u8>>,
590{
591 type Error = SessionBuilderError;
592
593 fn try_from(
594 value: UncheckedSessionProps<Host, Vpn, Username, Password>,
595 ) -> std::prelude::v1::Result<Self, Self::Error> {
596 let host_name = match value.host_name {
597 Some(x) => CString::new(x)?,
598 None => {
599 return Err(SessionBuilderError::MissingRequiredArgs(
600 "host_name".to_owned(),
601 ));
602 }
603 };
604
605 let vpn_name = match value.vpn_name {
606 Some(x) => CString::new(x)?,
607 None => {
608 return Err(SessionBuilderError::MissingRequiredArgs(
609 "vpn_name".to_owned(),
610 ));
611 }
612 };
613
614 let username = match value.username {
615 Some(x) => CString::new(x)?,
616 None => {
617 return Err(SessionBuilderError::MissingRequiredArgs(
618 "username".to_owned(),
619 ));
620 }
621 };
622
623 let password = match value.password {
624 Some(x) => CString::new(x)?,
625 None => {
626 return Err(SessionBuilderError::MissingRequiredArgs(
627 "password".to_owned(),
628 ));
629 }
630 };
631
632 let client_name = match value.client_name {
633 Some(x) => Some(CString::new(x)?),
634 None => None,
635 };
636
637 let application_description = match value.application_description {
638 Some(x) => Some(CString::new(x)?),
639 None => None,
640 };
641
642 let buffer_size_bytes = match value.buffer_size_bytes {
643 Some(x) if x < 1 => {
644 return Err(SessionBuilderError::InvalidRange(
645 "buffer_size_bytes".to_owned(),
646 ">= 1".to_owned(),
647 x.to_string(),
648 ));
649 }
650 Some(b) => Some(CString::new(b.to_string())?),
651 None => None,
652 };
653
654 let block_write_timeout_ms = match value.block_write_timeout_ms {
655 Some(x) if x < 1 => {
656 return Err(SessionBuilderError::InvalidRange(
657 "block_write_timeout_ms".to_owned(),
658 ">= 1".to_owned(),
659 x.to_string(),
660 ));
661 }
662 Some(x) => Some(CString::new(x.to_string())?),
663 None => None,
664 };
665
666 let connect_timeout_ms = match value.connect_timeout_ms {
667 Some(x) if x < 1 => {
668 return Err(SessionBuilderError::InvalidRange(
669 "connect_timeout_ms".to_owned(),
670 ">= 1".to_owned(),
671 x.to_string(),
672 ));
673 }
674 Some(x) => Some(CString::new(x.to_string())?),
675 None => None,
676 };
677
678 let subconfirm_timeout_ms = match value.subconfirm_timeout_ms {
679 Some(x) if x < 1000 => {
680 return Err(SessionBuilderError::InvalidRange(
681 "subconfirm_timeout_ms".to_owned(),
682 ">= 1000".to_owned(),
683 x.to_string(),
684 ));
685 }
686 Some(x) => Some(CString::new(x.to_string())?),
687 None => None,
688 };
689
690 let socket_send_buf_size_bytes = match value.socket_send_buf_size_bytes {
691 Some(x) if x != 0 && x < 1024 => {
692 return Err(SessionBuilderError::InvalidRange(
693 "socket_send_buf_size_bytes".to_owned(),
694 "0 or >= 1024".to_owned(),
695 x.to_string(),
696 ));
697 }
698 Some(x) => Some(CString::new(x.to_string())?),
699 None => None,
700 };
701
702 let socket_rcv_buf_size_bytes = match value.socket_rcv_buf_size_bytes {
703 Some(x) if x != 0 && x < 1024 => {
704 return Err(SessionBuilderError::InvalidRange(
705 "socket_rcv_buf_size_bytes".to_owned(),
706 "0 or >= 1024".to_owned(),
707 x.to_string(),
708 ));
709 }
710 Some(x) => Some(CString::new(x.to_string())?),
711 None => None,
712 };
713
714 let keep_alive_interval_ms = match value.keep_alive_interval_ms {
715 Some(x) if x != 0 && x < 50 => {
716 return Err(SessionBuilderError::InvalidRange(
717 "keep_alive_interval_ms".to_owned(),
718 "0 or >= 50".to_owned(),
719 x.to_string(),
720 ));
721 }
722 Some(x) => Some(CString::new(x.to_string())?),
723 None => None,
724 };
725
726 let keep_alive_limit = match value.keep_alive_limit {
727 Some(x) if x < 3 => {
728 return Err(SessionBuilderError::InvalidRange(
729 "keep_alive_limit".to_owned(),
730 ">= 3".to_owned(),
731 x.to_string(),
732 ));
733 }
734 Some(x) => Some(CString::new(x.to_string())?),
735 None => None,
736 };
737
738 let compression_level = match value.compression_level {
739 Some(x) if x > 9 => {
740 return Err(SessionBuilderError::InvalidRange(
741 "compression_level".to_owned(),
742 "<= 9".to_owned(),
743 x.to_string(),
744 ));
745 }
746 Some(x) => Some(CString::new(x.to_string())?),
747 None => None,
748 };
749
750 let connect_retries_per_host = match value.connect_retries_per_host {
751 Some(x) if x < -1 => {
752 return Err(SessionBuilderError::InvalidRange(
753 "connect_retries_per_host".to_owned(),
754 ">= -1".to_owned(),
755 x.to_string(),
756 ));
757 }
758 Some(x) => Some(CString::new(x.to_string())?),
759 None => None,
760 };
761
762 let connect_retries = match value.connect_retries {
763 Some(x) if x < -1 => {
764 return Err(SessionBuilderError::InvalidRange(
765 "connect_retries ".to_owned(),
766 ">= -1".to_owned(),
767 x.to_string(),
768 ));
769 }
770 Some(x) => Some(CString::new(x.to_string())?),
771 None => None,
772 };
773
774 let reconnect_retries = match value.reconnect_retries {
775 Some(x) if x < -1 => {
776 return Err(SessionBuilderError::InvalidRange(
777 "reconnect_retries ".to_owned(),
778 ">= -1".to_owned(),
779 x.to_string(),
780 ));
781 }
782 Some(x) => Some(CString::new(x.to_string())?),
783 None => None,
784 };
785
786 let reconnect_retry_wait_ms = match value.reconnect_retry_wait_ms {
787 Some(x) => Some(CString::new(x.to_string())?),
788 None => None,
789 };
790
791 let provision_timeout_ms = match value.provision_timeout_ms {
792 Some(x) => Some(CString::new(x.to_string())?),
793 None => None,
794 };
795 let modifyprop_timeout_ms = match value.modifyprop_timeout_ms {
796 Some(x) => Some(CString::new(x.to_string())?),
797 None => None,
798 };
799
800 Ok(Self {
801 host_name,
802 vpn_name,
803 username,
804 password,
805 buffer_size_bytes,
806 block_write_timeout_ms,
807 connect_timeout_ms,
808 subconfirm_timeout_ms,
809 ignore_dup_subscription_error: value.ignore_dup_subscription_error,
810 tcp_nodelay: value.tcp_nodelay,
811 socket_send_buf_size_bytes,
812 socket_rcv_buf_size_bytes,
813 keep_alive_interval_ms,
814 keep_alive_limit,
815 application_description,
816 client_name,
817 compression_level,
818 generate_rcv_timestamps: value.generate_rcv_timestamps,
819 generate_send_timestamp: value.generate_send_timestamp,
820 generate_sender_id: value.generate_sender_id,
821 generate_sender_sequence_number: value.generate_sender_sequence_number,
822 connect_retries_per_host,
823 connect_retries,
824 reconnect_retries,
825 reconnect_retry_wait_ms,
826 reapply_subscriptions: value.reapply_subscriptions,
827 provision_timeout_ms,
828 calculate_message_expiration: value.calculate_message_expiration,
829 no_local: value.no_local,
830 modifyprop_timeout_ms,
831 })
832 }
833}