nng_c/
socket.rs

1//!Socket module
2use crate::ErrorCode;
3use crate::error::error;
4use crate::msg::Message;
5use crate::aio::Aio;
6use crate::sys;
7use crate::str::String;
8use crate::options::{Options, Property};
9
10use core::pin::Pin;
11use core::ffi::c_int;
12use core::future::Future;
13use core::{mem, fmt, ops, ptr, task, marker, slice};
14
15use alloc::vec::Vec;
16
17type InitFn = unsafe extern "C" fn(msg: *mut sys::nng_socket) -> core::ffi::c_int;
18
19///Wrapper over slice of bytes.
20///
21///Can be converted into from any byte slice
22pub struct Buf<'a> {
23    ptr: *const u8,
24    size: usize,
25    _lifetime: marker::PhantomData<&'a u8>,
26}
27
28impl<'a> Buf<'a> {
29    #[inline]
30    const fn new(ptr: *const u8, size: usize) -> Self {
31        Self {
32            ptr,
33            size,
34            _lifetime: marker::PhantomData,
35        }
36    }
37}
38
39impl<'a> From<&'a [u8]> for Buf<'a> {
40    #[inline(always)]
41    fn from(value: &'a [u8]) -> Self {
42        Self::new(value.as_ptr(), value.len())
43    }
44}
45
46impl<'a, const N: usize> From<&'a [u8; N]> for Buf<'a> {
47    #[inline(always)]
48    fn from(value: &'a [u8; N]) -> Self {
49        Self::new(value.as_ptr(), value.len())
50    }
51}
52
53impl<'a> From<&'a [mem::MaybeUninit<u8>]> for Buf<'a> {
54    #[inline(always)]
55    fn from(value: &'a [mem::MaybeUninit<u8>]) -> Self {
56        Self::new(value.as_ptr() as _, value.len())
57    }
58}
59
60///Wrapper over mutable slice of bytes.
61///
62///Can be converted into from any mutable byte slice or mutable Vec
63pub struct BufMut<'a> {
64    ptr: *mut u8,
65    size: usize,
66    _lifetime: marker::PhantomData<&'a u8>,
67}
68
69impl<'a> BufMut<'a> {
70    #[inline]
71    const fn new(ptr: *mut u8, size: usize) -> Self {
72        Self {
73            ptr,
74            size,
75            _lifetime: marker::PhantomData,
76        }
77    }
78}
79
80impl<'a> From<&'a mut Vec<u8>> for BufMut<'a> {
81    #[inline(always)]
82    fn from(value: &'a mut Vec<u8>) -> Self {
83        let value = value.spare_capacity_mut();
84        From::from(value)
85    }
86}
87
88impl<'a> From<&'a mut [u8]> for BufMut<'a> {
89    #[inline(always)]
90    fn from(value: &'a mut [u8]) -> Self {
91        Self::new(value.as_mut_ptr(), value.len())
92    }
93}
94
95impl<'a, const N: usize> From<&'a mut [u8; N]> for BufMut<'a> {
96    #[inline(always)]
97    fn from(value: &'a mut [u8; N]) -> Self {
98        Self::new(value.as_mut_ptr(), value.len())
99    }
100}
101
102impl<'a> From<&'a mut [mem::MaybeUninit<u8>]> for BufMut<'a> {
103    #[inline(always)]
104    fn from(value: &'a mut [mem::MaybeUninit<u8>]) -> Self {
105        Self::new(value.as_mut_ptr() as _, value.len())
106    }
107}
108
109#[derive(Clone, Default)]
110///Connect options
111pub struct ConnectOptions<T> {
112    flags: c_int,
113    dialer: T
114}
115
116impl ConnectOptions<()> {
117    ///Initializes default connect options.
118    pub const fn new() -> Self {
119        Self {
120            flags: 0,
121            dialer: ()
122        }
123    }
124}
125
126impl<T> ConnectOptions<T> {
127    ///Sets async mode, making connection to be performed in background
128    ///
129    ///By default, connection blocks, until socket is connected to the remote peer.
130    pub const fn with_async(mut self) -> Self {
131        self.flags = self.flags | sys::NNG_FLAG_NONBLOCK;
132        self
133    }
134
135    ///Creates new options with custom dialer options
136    ///
137    ///This is useful to provide TLS config
138    pub const fn with_dialer<R: Options<Dialer>>(&self, dialer: R) -> ConnectOptions<R> {
139        ConnectOptions {
140            flags: self.flags,
141            dialer
142        }
143    }
144}
145
146#[repr(transparent)]
147///Generic socket type
148pub struct Socket(pub(crate) sys::nng_socket);
149
150impl Socket {
151    #[inline(always)]
152    fn with(init: InitFn) -> Result<Self, ErrorCode> {
153        let mut socket = sys::nng_socket {
154            id: 0
155        };
156
157        let result = unsafe {
158            (init)(&mut socket)
159        };
160
161        if result == 0 {
162            Ok(Self(socket))
163        } else {
164            Err(error(result))
165        }
166
167    }
168
169    #[inline(always)]
170    ///Creates new version 0 pair socket
171    pub fn pair0() -> Result<Self, ErrorCode> {
172        Self::with(sys::nng_pair0_open)
173    }
174
175    #[inline(always)]
176    ///Creates new version 1 pair socket
177    pub fn pair1() -> Result<Self, ErrorCode> {
178        Self::with(sys::nng_pair1_open)
179    }
180
181    #[inline(always)]
182    ///Creates new version 0 publisher socket
183    pub fn pub0() -> Result<Self, ErrorCode> {
184        Self::with(sys::nng_pub0_open)
185    }
186
187    #[inline(always)]
188    ///Creates new version 0 subscriber socket
189    pub fn sub0() -> Result<Self, ErrorCode> {
190        Self::with(sys::nng_sub0_open)
191    }
192
193    #[inline(always)]
194    ///Creates new version 0 request socket
195    pub fn req0() -> Result<Self, ErrorCode> {
196        Self::with(sys::nng_req0_open)
197    }
198
199    #[inline(always)]
200    ///Creates new version 0 reply socket
201    pub fn rep0() -> Result<Self, ErrorCode> {
202        Self::with(sys::nng_rep0_open)
203    }
204
205    #[inline(always)]
206    ///Closes socket.
207    ///
208    ///Returns `true` if operation had effect
209    ///Otherwise, if socket is already closed, returns `false`
210    pub fn close(&self) -> bool {
211        unsafe {
212            sys::nng_close(self.0) == 0
213        }
214    }
215
216    #[inline]
217    ///Binds socket to the specified `url`, starting to listen for incoming messages.
218    pub fn listen(&self, url: String<'_>) -> Result<(), ErrorCode> {
219        self.listen_with(url, &())
220    }
221
222    #[inline]
223    ///Binds socket to the specified `url`, starting to listen for incoming messages.
224    ///
225    ///Allows to provide custom options to initialize listener with.
226    ///Mostly useful to set optional TLS config
227    pub fn listen_with<T: Options<Listener>>(&self, url: String<'_>, options: &T) -> Result<(), ErrorCode> {
228        let listener = Listener::new(self, url)?;
229        options.apply(&listener)?;
230        listener.start()?;
231
232        //Listener will be assigned to the socket and can be closed by it
233        mem::forget(listener);
234
235        Ok(())
236    }
237
238    #[inline]
239    ///Connects to the remote peer via `url`.
240    pub fn connect(&self, url: String<'_>) -> Result<(), ErrorCode> {
241        self.connect_with(url, ConnectOptions::new())
242    }
243
244    #[inline]
245    ///Connects to the remote peer via `url`, with custom options settings
246    pub fn connect_with<T: Options<Dialer>>(&self, url: String<'_>, options: ConnectOptions<T>) -> Result<(), ErrorCode> {
247        let dialer = Dialer::new(self, url)?;
248        options.dialer.apply(&dialer)?;
249        dialer.start(options.flags)?;
250
251        //Dialer will be assigned to the socket and can be closed by it
252        mem::forget(dialer);
253
254        Ok(())
255    }
256
257    #[inline(always)]
258    ///Sets options on the socket
259    ///
260    ///It is user responsibility to use options that are valid for the protocol of use
261    pub fn set_opt<T: Options<Self>>(&self, opts: T) -> Result<(), ErrorCode> {
262        opts.apply(self)
263    }
264
265    #[inline(always)]
266    ///Get property of the socket
267    pub fn get_prop<T: Property<Self>>(&self) -> Result<T, ErrorCode> {
268        T::get(self)
269    }
270
271    fn recv_inner<'a, const FLAGS: c_int>(&self, out: BufMut<'a>) -> Result<&'a [u8], ErrorCode> {
272        let mut size = out.size;
273        let result = unsafe {
274            sys::nng_recv(**self, out.ptr as _, &mut size, FLAGS)
275        };
276
277        match result {
278            0 => {
279                let out = unsafe {
280                    slice::from_raw_parts(out.ptr, size)
281                };
282                Ok(out)
283            },
284            code => Err(error(code)),
285        }
286    }
287
288    #[inline(always)]
289    ///Attempts to receive message, writing it in `out` buffer if it is of sufficient size,
290    ///returning immediately if no message is available
291    ///
292    ///If underlying protocol doesn't support receiving messages, this shall return error always
293    ///
294    ///Returns written bytes on success
295    ///
296    ///Returns [would block](https://docs.rs/error-code/3.2.0/error_code/struct.ErrorCode.html#method.is_would_block)
297    ///error if no message is available.
298    pub fn try_recv<'a>(&self, out: impl Into<BufMut<'a>>) -> Result<&'a [u8], ErrorCode> {
299        self.recv_inner::<{sys::NNG_FLAG_NONBLOCK}>(out.into())
300    }
301
302    #[inline(always)]
303    ///Receives message, writing it in `out` buffer if it is of sufficient size, waiting forever if none is available.
304    ///
305    ///If underlying protocol doesn't support receiving messages, this shall return error always
306    ///
307    ///Returns written bytes on success
308    pub fn recv<'a>(&self, out: impl Into<BufMut<'a>>) -> Result<&'a [u8], ErrorCode> {
309        self.recv_inner::<0>(out.into())
310    }
311
312    ///Receives pending message, waiting forever if none is available.
313    ///
314    ///If underlying protocol doesn't support receiving messages, this shall return error always
315    fn recv_msg_inner<const FLAGS: c_int>(&self) -> Result<Message, ErrorCode> {
316        let mut msg = ptr::null_mut();
317        let result = unsafe {
318            sys::nng_recvmsg(**self, &mut msg, FLAGS)
319        };
320
321        match ptr::NonNull::new(msg) {
322            Some(ptr) => Ok(Message(ptr)),
323            None => Err(error(result)),
324        }
325    }
326
327    #[inline]
328    ///Receives pending message, waiting forever if none is available.
329    ///
330    ///If underlying protocol doesn't support receiving messages, this shall return error always
331    pub fn recv_msg(&self) -> Result<Message, ErrorCode> {
332        self.recv_msg_inner::<0>()
333    }
334
335    #[inline]
336    ///Receives pending message, waiting forever if none is available.
337    ///
338    ///If underlying protocol doesn't support receiving messages, this shall return error always
339    ///
340    ///Returns None if no message is available.
341    pub fn try_recv_msg(&self) -> Result<Option<Message>, ErrorCode> {
342        match self.recv_msg_inner::<{sys::NNG_FLAG_NONBLOCK}>() {
343            Ok(msg) => Ok(Some(msg)),
344            Err(error) if error.is_would_block() => Ok(None),
345            Err(error) => Err(error)
346        }
347    }
348
349    #[inline]
350    ///Creates new future that attempts to receive message from the socket.
351    pub fn recv_msg_async(&self) -> Result<FutureResp, ErrorCode> {
352        FutureResp::new(self)
353    }
354
355    #[inline]
356    ///Encodes bytes into message and send it over the socket.
357    ///
358    ///Internally message shall be encoded and sent over
359    pub fn send(&self, msg: Buf<'_>) -> Result<(), ErrorCode> {
360        let result = unsafe {
361            sys::nng_send(**self, msg.ptr as _, msg.size, 0)
362        };
363
364        match result {
365            0 => Ok(()),
366            code => Err(error(code)),
367        }
368    }
369
370    #[inline]
371    ///Sends message over the socket.
372    ///
373    ///If successful takes ownership of message.
374    ///Otherwise returns message with error code.
375    pub fn send_msg(&self, msg: Message) -> Result<(), (Message, ErrorCode)> {
376        let result = unsafe {
377            sys::nng_sendmsg(**self, msg.as_ptr(), 0)
378        };
379
380        match result {
381            0 => {
382                mem::forget(msg);
383                Ok(())
384            },
385            code => Err((msg, error(code))),
386        }
387    }
388
389    #[inline]
390    ///Sends message over the socket asynchronously.
391    ///
392    ///If successful takes ownership of message.
393    ///Otherwise returns message with error code.
394    pub fn send_msg_async(&self, msg: Message) -> Result<FutureReq, ErrorCode> {
395        FutureReq::new(self, msg)
396    }
397}
398
399impl fmt::Debug for Socket {
400    #[inline(always)]
401    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
402        fmt.write_fmt(format_args!("Socket(id={})", self.0.id))
403    }
404}
405
406impl Drop for Socket {
407    #[inline(always)]
408    fn drop(&mut self) {
409        self.close();
410    }
411}
412
413impl ops::Deref for Socket {
414    type Target = sys::nng_socket;
415
416    #[inline(always)]
417    fn deref(&self) -> &Self::Target {
418        &self.0
419    }
420}
421
422impl ops::DerefMut for Socket {
423    #[inline(always)]
424    fn deref_mut(&mut self) -> &mut Self::Target {
425        &mut self.0
426    }
427}
428
429///Futures that resolves into message
430pub struct FutureResp {
431    aio: Aio,
432}
433
434impl FutureResp {
435    ///Creates new future to retrieve message from the socket
436    pub fn new(socket: &Socket) -> Result<Self, ErrorCode> {
437        let aio = Aio::new()?;
438        unsafe {
439            sys::nng_recv_aio(**socket, aio.as_ptr())
440        }
441
442        Ok(Self {
443            aio
444        })
445    }
446
447    ///Sets future for cancelling
448    pub fn cancel(&self) {
449        unsafe {
450            sys::nng_aio_cancel(self.aio.as_ptr())
451        }
452    }
453}
454
455impl Future for FutureResp {
456    type Output = Result<Option<Message>, ErrorCode>;
457
458    fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
459        let mut this = self.as_mut();
460        if this.aio.is_ready() {
461            task::Poll::Ready(this.aio.get_msg())
462        } else {
463            this.aio.register_waker(ctx.waker());
464            task::Poll::Pending
465        }
466    }
467}
468
469///Futures that awaits message to be sent
470pub struct FutureReq {
471    aio: Aio,
472}
473
474impl FutureReq {
475    ///Creates new future taking ownership over `msg`
476    pub fn new(socket: &Socket, msg: Message) -> Result<Self, ErrorCode> {
477        let aio = Aio::new()?;
478        unsafe {
479            sys::nng_aio_set_msg(aio.as_ptr(), msg.as_ptr());
480            sys::nng_send_aio(**socket, aio.as_ptr())
481        }
482
483        //AIO takes ownership of the message
484        mem::forget(msg);
485
486        Ok(Self {
487            aio
488        })
489    }
490
491    ///Sets future for cancelling
492    pub fn cancel(&self) {
493        unsafe {
494            sys::nng_aio_cancel(self.aio.as_ptr())
495        }
496    }
497}
498
499impl Future for FutureReq {
500    type Output = Result<(), (Message, ErrorCode)>;
501
502    fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
503        let mut this = self.as_mut();
504        if this.aio.is_ready() {
505            task::Poll::Ready(this.aio.get_send_result())
506        } else {
507            this.aio.register_waker(ctx.waker());
508            task::Poll::Pending
509        }
510    }
511}
512
513///Socket listener
514pub struct Listener(pub(crate) sys::nng_listener);
515
516impl Listener {
517    pub(crate) fn new(socket: &Socket, url: String<'_>) -> Result<Self, ErrorCode> {
518        let url = url.as_ptr();
519        let mut this = sys::nng_listener {
520            id: 0
521        };
522
523        let result = unsafe {
524            sys::nng_listener_create(&mut this, **socket, url as _)
525        };
526
527        match result {
528            0 => Ok(Self(this)),
529            code => Err(error(code))
530        }
531    }
532
533    pub(crate) fn start(&self) -> Result<(), ErrorCode> {
534        let result = unsafe {
535            sys::nng_listener_start(self.0, 0)
536        };
537
538        match result {
539            0 => Ok(()),
540            code => Err(error(code))
541        }
542    }
543}
544
545impl Drop for Listener {
546    #[inline]
547    fn drop(&mut self) {
548        unsafe {
549            sys::nng_listener_close(self.0);
550        }
551    }
552}
553
554///Socket dialer
555pub struct Dialer(pub(crate) sys::nng_dialer);
556
557impl Dialer {
558    pub(crate) fn new(socket: &Socket, url: String<'_>) -> Result<Self, ErrorCode> {
559        let url = url.as_ptr();
560        let mut this = sys::nng_dialer {
561            id: 0
562        };
563
564        let result = unsafe {
565            sys::nng_dialer_create(&mut this, **socket, url as _)
566        };
567
568        match result {
569            0 => Ok(Self(this)),
570            code => Err(error(code))
571        }
572    }
573
574    pub(crate) fn start(&self, flags: c_int) -> Result<(), ErrorCode> {
575        let result = unsafe {
576            sys::nng_dialer_start(self.0, flags)
577        };
578
579        match result {
580            0 => Ok(()),
581            code => Err(error(code))
582        }
583    }
584}
585
586impl Drop for Dialer {
587    #[inline]
588    fn drop(&mut self) {
589        unsafe {
590            sys::nng_dialer_close(self.0);
591        }
592    }
593}