1use crate::ErrorCode;
3use crate::error::error;
4use crate::msg::Message;
5use crate::aio::Aio;
6use crate::sys;
7use crate::str::String;
8use crate::options::{Options, Property};
9
10use core::pin::Pin;
11use core::ffi::c_int;
12use core::future::Future;
13use core::{mem, fmt, ops, ptr, task, marker, slice};
14
15use alloc::vec::Vec;
16
17type InitFn = unsafe extern "C" fn(msg: *mut sys::nng_socket) -> core::ffi::c_int;
18
19pub struct Buf<'a> {
23 ptr: *const u8,
24 size: usize,
25 _lifetime: marker::PhantomData<&'a u8>,
26}
27
28impl<'a> Buf<'a> {
29 #[inline]
30 const fn new(ptr: *const u8, size: usize) -> Self {
31 Self {
32 ptr,
33 size,
34 _lifetime: marker::PhantomData,
35 }
36 }
37}
38
39impl<'a> From<&'a [u8]> for Buf<'a> {
40 #[inline(always)]
41 fn from(value: &'a [u8]) -> Self {
42 Self::new(value.as_ptr(), value.len())
43 }
44}
45
46impl<'a, const N: usize> From<&'a [u8; N]> for Buf<'a> {
47 #[inline(always)]
48 fn from(value: &'a [u8; N]) -> Self {
49 Self::new(value.as_ptr(), value.len())
50 }
51}
52
53impl<'a> From<&'a [mem::MaybeUninit<u8>]> for Buf<'a> {
54 #[inline(always)]
55 fn from(value: &'a [mem::MaybeUninit<u8>]) -> Self {
56 Self::new(value.as_ptr() as _, value.len())
57 }
58}
59
60pub struct BufMut<'a> {
64 ptr: *mut u8,
65 size: usize,
66 _lifetime: marker::PhantomData<&'a u8>,
67}
68
69impl<'a> BufMut<'a> {
70 #[inline]
71 const fn new(ptr: *mut u8, size: usize) -> Self {
72 Self {
73 ptr,
74 size,
75 _lifetime: marker::PhantomData,
76 }
77 }
78}
79
80impl<'a> From<&'a mut Vec<u8>> for BufMut<'a> {
81 #[inline(always)]
82 fn from(value: &'a mut Vec<u8>) -> Self {
83 let value = value.spare_capacity_mut();
84 From::from(value)
85 }
86}
87
88impl<'a> From<&'a mut [u8]> for BufMut<'a> {
89 #[inline(always)]
90 fn from(value: &'a mut [u8]) -> Self {
91 Self::new(value.as_mut_ptr(), value.len())
92 }
93}
94
95impl<'a, const N: usize> From<&'a mut [u8; N]> for BufMut<'a> {
96 #[inline(always)]
97 fn from(value: &'a mut [u8; N]) -> Self {
98 Self::new(value.as_mut_ptr(), value.len())
99 }
100}
101
102impl<'a> From<&'a mut [mem::MaybeUninit<u8>]> for BufMut<'a> {
103 #[inline(always)]
104 fn from(value: &'a mut [mem::MaybeUninit<u8>]) -> Self {
105 Self::new(value.as_mut_ptr() as _, value.len())
106 }
107}
108
109#[derive(Clone, Default)]
110pub struct ConnectOptions<T> {
112 flags: c_int,
113 dialer: T
114}
115
116impl ConnectOptions<()> {
117 pub const fn new() -> Self {
119 Self {
120 flags: 0,
121 dialer: ()
122 }
123 }
124}
125
126impl<T> ConnectOptions<T> {
127 pub const fn with_async(mut self) -> Self {
131 self.flags = self.flags | sys::NNG_FLAG_NONBLOCK;
132 self
133 }
134
135 pub const fn with_dialer<R: Options<Dialer>>(&self, dialer: R) -> ConnectOptions<R> {
139 ConnectOptions {
140 flags: self.flags,
141 dialer
142 }
143 }
144}
145
146#[repr(transparent)]
147pub struct Socket(pub(crate) sys::nng_socket);
149
150impl Socket {
151 #[inline(always)]
152 fn with(init: InitFn) -> Result<Self, ErrorCode> {
153 let mut socket = sys::nng_socket {
154 id: 0
155 };
156
157 let result = unsafe {
158 (init)(&mut socket)
159 };
160
161 if result == 0 {
162 Ok(Self(socket))
163 } else {
164 Err(error(result))
165 }
166
167 }
168
169 #[inline(always)]
170 pub fn pair0() -> Result<Self, ErrorCode> {
172 Self::with(sys::nng_pair0_open)
173 }
174
175 #[inline(always)]
176 pub fn pair1() -> Result<Self, ErrorCode> {
178 Self::with(sys::nng_pair1_open)
179 }
180
181 #[inline(always)]
182 pub fn pub0() -> Result<Self, ErrorCode> {
184 Self::with(sys::nng_pub0_open)
185 }
186
187 #[inline(always)]
188 pub fn sub0() -> Result<Self, ErrorCode> {
190 Self::with(sys::nng_sub0_open)
191 }
192
193 #[inline(always)]
194 pub fn req0() -> Result<Self, ErrorCode> {
196 Self::with(sys::nng_req0_open)
197 }
198
199 #[inline(always)]
200 pub fn rep0() -> Result<Self, ErrorCode> {
202 Self::with(sys::nng_rep0_open)
203 }
204
205 #[inline(always)]
206 pub fn close(&self) -> bool {
211 unsafe {
212 sys::nng_close(self.0) == 0
213 }
214 }
215
216 #[inline]
217 pub fn listen(&self, url: String<'_>) -> Result<(), ErrorCode> {
219 self.listen_with(url, &())
220 }
221
222 #[inline]
223 pub fn listen_with<T: Options<Listener>>(&self, url: String<'_>, options: &T) -> Result<(), ErrorCode> {
228 let listener = Listener::new(self, url)?;
229 options.apply(&listener)?;
230 listener.start()?;
231
232 mem::forget(listener);
234
235 Ok(())
236 }
237
238 #[inline]
239 pub fn connect(&self, url: String<'_>) -> Result<(), ErrorCode> {
241 self.connect_with(url, ConnectOptions::new())
242 }
243
244 #[inline]
245 pub fn connect_with<T: Options<Dialer>>(&self, url: String<'_>, options: ConnectOptions<T>) -> Result<(), ErrorCode> {
247 let dialer = Dialer::new(self, url)?;
248 options.dialer.apply(&dialer)?;
249 dialer.start(options.flags)?;
250
251 mem::forget(dialer);
253
254 Ok(())
255 }
256
257 #[inline(always)]
258 pub fn set_opt<T: Options<Self>>(&self, opts: T) -> Result<(), ErrorCode> {
262 opts.apply(self)
263 }
264
265 #[inline(always)]
266 pub fn get_prop<T: Property<Self>>(&self) -> Result<T, ErrorCode> {
268 T::get(self)
269 }
270
271 fn recv_inner<'a, const FLAGS: c_int>(&self, out: BufMut<'a>) -> Result<&'a [u8], ErrorCode> {
272 let mut size = out.size;
273 let result = unsafe {
274 sys::nng_recv(**self, out.ptr as _, &mut size, FLAGS)
275 };
276
277 match result {
278 0 => {
279 let out = unsafe {
280 slice::from_raw_parts(out.ptr, size)
281 };
282 Ok(out)
283 },
284 code => Err(error(code)),
285 }
286 }
287
288 #[inline(always)]
289 pub fn try_recv<'a>(&self, out: impl Into<BufMut<'a>>) -> Result<&'a [u8], ErrorCode> {
299 self.recv_inner::<{sys::NNG_FLAG_NONBLOCK}>(out.into())
300 }
301
302 #[inline(always)]
303 pub fn recv<'a>(&self, out: impl Into<BufMut<'a>>) -> Result<&'a [u8], ErrorCode> {
309 self.recv_inner::<0>(out.into())
310 }
311
312 fn recv_msg_inner<const FLAGS: c_int>(&self) -> Result<Message, ErrorCode> {
316 let mut msg = ptr::null_mut();
317 let result = unsafe {
318 sys::nng_recvmsg(**self, &mut msg, FLAGS)
319 };
320
321 match ptr::NonNull::new(msg) {
322 Some(ptr) => Ok(Message(ptr)),
323 None => Err(error(result)),
324 }
325 }
326
327 #[inline]
328 pub fn recv_msg(&self) -> Result<Message, ErrorCode> {
332 self.recv_msg_inner::<0>()
333 }
334
335 #[inline]
336 pub fn try_recv_msg(&self) -> Result<Option<Message>, ErrorCode> {
342 match self.recv_msg_inner::<{sys::NNG_FLAG_NONBLOCK}>() {
343 Ok(msg) => Ok(Some(msg)),
344 Err(error) if error.is_would_block() => Ok(None),
345 Err(error) => Err(error)
346 }
347 }
348
349 #[inline]
350 pub fn recv_msg_async(&self) -> Result<FutureResp, ErrorCode> {
352 FutureResp::new(self)
353 }
354
355 #[inline]
356 pub fn send(&self, msg: Buf<'_>) -> Result<(), ErrorCode> {
360 let result = unsafe {
361 sys::nng_send(**self, msg.ptr as _, msg.size, 0)
362 };
363
364 match result {
365 0 => Ok(()),
366 code => Err(error(code)),
367 }
368 }
369
370 #[inline]
371 pub fn send_msg(&self, msg: Message) -> Result<(), (Message, ErrorCode)> {
376 let result = unsafe {
377 sys::nng_sendmsg(**self, msg.as_ptr(), 0)
378 };
379
380 match result {
381 0 => {
382 mem::forget(msg);
383 Ok(())
384 },
385 code => Err((msg, error(code))),
386 }
387 }
388
389 #[inline]
390 pub fn send_msg_async(&self, msg: Message) -> Result<FutureReq, ErrorCode> {
395 FutureReq::new(self, msg)
396 }
397}
398
399impl fmt::Debug for Socket {
400 #[inline(always)]
401 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
402 fmt.write_fmt(format_args!("Socket(id={})", self.0.id))
403 }
404}
405
406impl Drop for Socket {
407 #[inline(always)]
408 fn drop(&mut self) {
409 self.close();
410 }
411}
412
413impl ops::Deref for Socket {
414 type Target = sys::nng_socket;
415
416 #[inline(always)]
417 fn deref(&self) -> &Self::Target {
418 &self.0
419 }
420}
421
422impl ops::DerefMut for Socket {
423 #[inline(always)]
424 fn deref_mut(&mut self) -> &mut Self::Target {
425 &mut self.0
426 }
427}
428
429pub struct FutureResp {
431 aio: Aio,
432}
433
434impl FutureResp {
435 pub fn new(socket: &Socket) -> Result<Self, ErrorCode> {
437 let aio = Aio::new()?;
438 unsafe {
439 sys::nng_recv_aio(**socket, aio.as_ptr())
440 }
441
442 Ok(Self {
443 aio
444 })
445 }
446
447 pub fn cancel(&self) {
449 unsafe {
450 sys::nng_aio_cancel(self.aio.as_ptr())
451 }
452 }
453}
454
455impl Future for FutureResp {
456 type Output = Result<Option<Message>, ErrorCode>;
457
458 fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
459 let mut this = self.as_mut();
460 if this.aio.is_ready() {
461 task::Poll::Ready(this.aio.get_msg())
462 } else {
463 this.aio.register_waker(ctx.waker());
464 task::Poll::Pending
465 }
466 }
467}
468
469pub struct FutureReq {
471 aio: Aio,
472}
473
474impl FutureReq {
475 pub fn new(socket: &Socket, msg: Message) -> Result<Self, ErrorCode> {
477 let aio = Aio::new()?;
478 unsafe {
479 sys::nng_aio_set_msg(aio.as_ptr(), msg.as_ptr());
480 sys::nng_send_aio(**socket, aio.as_ptr())
481 }
482
483 mem::forget(msg);
485
486 Ok(Self {
487 aio
488 })
489 }
490
491 pub fn cancel(&self) {
493 unsafe {
494 sys::nng_aio_cancel(self.aio.as_ptr())
495 }
496 }
497}
498
499impl Future for FutureReq {
500 type Output = Result<(), (Message, ErrorCode)>;
501
502 fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
503 let mut this = self.as_mut();
504 if this.aio.is_ready() {
505 task::Poll::Ready(this.aio.get_send_result())
506 } else {
507 this.aio.register_waker(ctx.waker());
508 task::Poll::Pending
509 }
510 }
511}
512
513pub struct Listener(pub(crate) sys::nng_listener);
515
516impl Listener {
517 pub(crate) fn new(socket: &Socket, url: String<'_>) -> Result<Self, ErrorCode> {
518 let url = url.as_ptr();
519 let mut this = sys::nng_listener {
520 id: 0
521 };
522
523 let result = unsafe {
524 sys::nng_listener_create(&mut this, **socket, url as _)
525 };
526
527 match result {
528 0 => Ok(Self(this)),
529 code => Err(error(code))
530 }
531 }
532
533 pub(crate) fn start(&self) -> Result<(), ErrorCode> {
534 let result = unsafe {
535 sys::nng_listener_start(self.0, 0)
536 };
537
538 match result {
539 0 => Ok(()),
540 code => Err(error(code))
541 }
542 }
543}
544
545impl Drop for Listener {
546 #[inline]
547 fn drop(&mut self) {
548 unsafe {
549 sys::nng_listener_close(self.0);
550 }
551 }
552}
553
554pub struct Dialer(pub(crate) sys::nng_dialer);
556
557impl Dialer {
558 pub(crate) fn new(socket: &Socket, url: String<'_>) -> Result<Self, ErrorCode> {
559 let url = url.as_ptr();
560 let mut this = sys::nng_dialer {
561 id: 0
562 };
563
564 let result = unsafe {
565 sys::nng_dialer_create(&mut this, **socket, url as _)
566 };
567
568 match result {
569 0 => Ok(Self(this)),
570 code => Err(error(code))
571 }
572 }
573
574 pub(crate) fn start(&self, flags: c_int) -> Result<(), ErrorCode> {
575 let result = unsafe {
576 sys::nng_dialer_start(self.0, flags)
577 };
578
579 match result {
580 0 => Ok(()),
581 code => Err(error(code))
582 }
583 }
584}
585
586impl Drop for Dialer {
587 #[inline]
588 fn drop(&mut self) {
589 unsafe {
590 sys::nng_dialer_close(self.0);
591 }
592 }
593}