Struct nanomsg::Socket[][src]

pub struct Socket { /* fields omitted */ }

A type-safe socket wrapper around nanomsg's own socket implementation. This provides a safe interface for dealing with initializing the sockets, sending and receiving messages.

Methods

impl Socket
[src]

Allocate and initialize a new Nanomsg socket which returns a new file descriptor behind the scene. The safe interface doesn't expose any of the underlying file descriptors and such.

Example

use nanomsg::{Socket, Protocol};

let mut socket = match Socket::new(Protocol::Pull) {
    Ok(socket) => socket,
    Err(err) => panic!("{}", err)
};

Error

  • AddressFamilyNotSupported : Specified address family is not supported.
  • InvalidArgument : Unknown protocol.
  • TooManyOpenFiles : The limit on the total number of open SP sockets or OS limit for file descriptors has been reached.
  • Terminating : The library is terminating.

Allocate and initialize a new Nanomsg socket meant to be used in a device

Example

use nanomsg::{Socket, Protocol};

let mut s1 = Socket::new_for_device(Protocol::Req).unwrap();
let mut s2 = Socket::new_for_device(Protocol::Rep).unwrap();
let ep1 = s1.bind("ipc:///tmp/new_for_device1.ipc").unwrap();
let ep2 = s2.bind("ipc:///tmp/new_for_device2.ipc").unwrap();

// And now `Socket::device(&s1, &s2)` can be called to create the device.

Creating a new socket through Socket::new does not bind that socket to a listening state. Instead, one has to be explicit in enabling the socket to listen onto a specific address.

That's what the bind method does. Passing in a raw string like: "ipc:///tmp/pipeline.ipc" is supported.

Note: This does not block the current task. That job is up to the user of the library by entering a loop.

Example

use nanomsg::{Socket, Protocol};

let mut socket = match Socket::new(Protocol::Push) {
    Ok(socket) => socket,
    Err(err) => panic!("{}", err)
};

// Bind the newly created socket to the following address:
match socket.bind("ipc:///tmp/bind_doc.ipc") {
    Ok(_) => {},
    Err(err) => panic!("Failed to bind socket: {}", err)
}

Error

  • BadFileDescriptor : The socket is invalid.
  • TooManyOpenFiles : Maximum number of active endpoints was reached.
  • InvalidArgument : The syntax of the supplied address is invalid.
  • NameTooLong : The supplied address is too long.
  • ProtocolNotSupported : The requested transport protocol is not supported.
  • AddressNotAvailable : The requested endpoint is not local.
  • NoDevice : Address specifies a nonexistent interface.
  • AddressInUse : The requested local endpoint is already in use.
  • Terminating : The library is terminating.

Connects the socket to a remote endpoint. Returns the endpoint on success.

Example

use nanomsg::{Socket, Protocol};

let mut socket = match Socket::new(Protocol::Pull) {
    Ok(socket) => socket,
    Err(err) => panic!("{}", err)
};

let endpoint = match socket.connect("ipc:///tmp/connect_doc.ipc") {
    Ok(ep) => ep,
    Err(err) => panic!("Failed to connect socket: {}", err)
};

Error

  • BadFileDescriptor : The socket is invalid.
  • TooManyOpenFiles : Maximum number of active endpoints was reached.
  • InvalidArgument : The syntax of the supplied address is invalid.
  • NameTooLong : The supplied address is too long.
  • ProtocolNotSupported : The requested transport protocol is not supported.
  • NoDevice : Address specifies a nonexistent interface.
  • Terminating : The library is terminating.

Non-blocking version of the read function. Any bytes exceeding the length specified by buf.len() will be truncated. Returns the number of bytes of the message stored in the buffer on success. Please note that it differs from nanomsg's nn_recv which returns the msg size instead. An error with the Error::TryAgain kind is returned if there's no message to receive for the moment.

Example

use nanomsg::{Socket, Protocol, Error};

let mut socket = Socket::new(Protocol::Pull).unwrap();
let mut endpoint = socket.connect("ipc:///tmp/nb_read_doc.ipc").unwrap();
let mut buffer = [0u8; 1024];

match socket.nb_read(&mut buffer) {
    Ok(count) => {
        println!("Read {} bytes !", count);
        // here we can process the message stored in `buffer`
    },
    Err(Error::TryAgain) => {
        println!("Nothing to be read for the moment ...");
        // here we can use the CPU for something else and try again later
    },
    Err(err) => panic!("Problem while reading: {}", err)
};

Error

  • BadFileDescriptor : The socket is invalid.
  • OperationNotSupported : The operation is not supported by this socket type.
  • FileStateMismatch : The operation cannot be performed on this socket at the moment because socket is not in the appropriate state. This error may occur with socket types that switch between several states.
  • TryAgain : Non-blocking mode was requested and there’s no message to receive at the moment.
  • Interrupted : The operation was interrupted by delivery of a signal before the message was received.
  • Terminating : The library is terminating.

Non-blocking version of the read_to_end function. Copy the message allocated by nanomsg into the buffer on success. An error with the Error::TryAgain kind is returned if there's no message to receive for the moment.

Example:

#![allow(unstable)]
use nanomsg::{Socket, Protocol, Error};

let mut socket = Socket::new(Protocol::Pull).unwrap();
let mut endpoint = socket.connect("ipc:///tmp/nb_read_to_end_doc.ipc").unwrap();

let mut buffer = Vec::new();
match socket.nb_read_to_end(&mut buffer) {
    Ok(_) => {
        println!("Read message {} bytes !", buffer.len());
        // here we can process the message stored in `buffer`
    },
    Err(Error::TryAgain) => {
        println!("Nothing to be read for the moment ...");
        // here we can use the CPU for something else and try again later
    },
    Err(err) => panic!("Problem while reading: {}", err)
};

Error

  • BadFileDescriptor : The socket is invalid.
  • OperationNotSupported : The operation is not supported by this socket type.
  • FileStateMismatch : The operation cannot be performed on this socket at the moment because socket is not in the appropriate state. This error may occur with socket types that switch between several states.
  • TryAgain : Non-blocking mode was requested and there’s no message to receive at the moment.
  • Interrupted : The operation was interrupted by delivery of a signal before the message was received.
  • Terminating : The library is terminating.

Non-blocking version of the write function. An error with the Error::TryAgain kind is returned if the message cannot be sent at the moment.

Example:

use nanomsg::{Socket, Protocol, Error};

let mut socket = Socket::new(Protocol::Push).unwrap();
let mut endpoint = socket.connect("ipc:///tmp/nb_write_doc.ipc").unwrap();

match socket.nb_write(b"foobar") {
    Ok(_) => { println!("Message sent !"); },
    Err(Error::TryAgain) => {
        println!("Receiver not ready, message can't be sent for the moment ...");
    },
    Err(err) => panic!("Problem while writing: {}", err)
};

Error

  • BadFileDescriptor : The socket is invalid.
  • OperationNotSupported : The operation is not supported by this socket type.
  • FileStateMismatch : The operation cannot be performed on this socket at the moment because socket is not in the appropriate state. This error may occur with socket types that switch between several states.
  • TryAgain : Non-blocking mode was requested and there’s no message to receive at the moment.
  • Interrupted : The operation was interrupted by delivery of a signal before the message was received.
  • Terminating : The library is terminating.

Zero-copy version of the write function.

Example:

use nanomsg::{Socket, Protocol};
use std::io::{Read, Write};

let mut push_socket = Socket::new(Protocol::Push).unwrap();
let mut push_endpoint = push_socket.bind("ipc:///tmp/zc_write_doc.ipc").unwrap();
let mut pull_socket = Socket::new(Protocol::Pull).unwrap();
let mut pull_endpoint = pull_socket.connect("ipc:///tmp/zc_write_doc.ipc").unwrap();
let mut msg = Socket::allocate_msg(6).unwrap();
msg[0] = 102u8;
msg[1] = 111u8;
msg[2] = 111u8;
msg[3] = 98u8;
msg[4] = 97u8;
msg[5] = 114u8;

match push_socket.zc_write(msg) {
    Ok(_) => { println!("Message sent, do not try to reuse it !"); },
    Err(err) => panic!("Problem while writing: {}, msg still available", err)
};
let mut text = String::new();
match pull_socket.read_to_string(&mut text) {
    Ok(_) => { println!("Message received."); },
    Err(err) => panic!("{}", err)
}

Error

  • BadFileDescriptor : The socket is invalid.
  • OperationNotSupported : The operation is not supported by this socket type.
  • FileStateMismatch : The operation cannot be performed on this socket at the moment because socket is not in the appropriate state. This error may occur with socket types that switch between several states.
  • Interrupted : The operation was interrupted by delivery of a signal before the message was received.
  • Terminating : The library is terminating.

Allocate a message of the specified size to be sent in zero-copy fashion. The content of the message is undefined after allocation and it should be filled in by the user. While write functions allow to send arbitrary buffers, buffers allocated using allocate_msg can be more efficient for large messages as they allow for using zero-copy techniques.

Error

  • InvalidArgument : Supplied allocation type is invalid.
  • Unknown : Out of memory.

Deallocates a message allocated using allocate_msg function

Error

  • BadAddress : The message pointer is invalid.

Creates a poll request for the socket with the specified check criteria.

  • pollinout: See PollInOut for options

Checks a set of sockets and reports whether it’s possible to send a message to the socket and/or receive a message from each socket. Upon successful completion, the number of PollFd structures with events signaled is returned.

Example

use nanomsg::{Socket, Protocol, PollFd, PollRequest, PollInOut};
use std::thread;

let mut left_socket = Socket::new(Protocol::Pair).unwrap();
let mut left_ep = left_socket.bind("ipc:///tmp/poll_doc.ipc").unwrap();

let mut right_socket = Socket::new(Protocol::Pair).unwrap();
let mut right_ep = right_socket.connect("ipc:///tmp/poll_doc.ipc").unwrap();

thread::sleep_ms(10);

// Here some messages may have been sent ...

let mut pollfd_vec: Vec<PollFd> = vec![left_socket.new_pollfd(PollInOut::InOut), right_socket.new_pollfd(PollInOut::InOut)];
let mut poll_req = PollRequest::new(&mut pollfd_vec[..]);
let timeout = 10;
let poll_result = Socket::poll(&mut poll_req, timeout);

if poll_req.get_fds()[0].can_write() {
    // left_socket socket can send a message ...
}

if poll_req.get_fds()[1].can_read() {
    // right_socket socket is ready to receive a message ...
}

Error

  • BadFileDescriptor : Some of the provided sockets are invalid.
  • Interrupted : The operation was interrupted by delivery of a signal before the message was received.
  • Timeout : No event was signaled before the specified timeout.
  • Terminating : The library is terminating.

Starts a device to forward messages between two sockets. If both sockets are valid, device function loops and sends and messages received from s1 to s2 and vice versa. If only one socket is valid and the other is negative, device works in a "loopback" mode — it loops and sends any messages received from the socket back to itself. To break the loop and make device function exit use terminate function.

Error

  • BadFileDescriptor : Some of the provided sockets are invalid.
  • Interrupted : The operation was interrupted by delivery of a signal before the message was received.
  • InvalidArgument : Either one of the socket is not an AF_SP_RAW socket; or the two sockets don’t belong to the same protocol; or the directionality of the sockets doesn’t fit (e.g. attempt to join two SINK sockets to form a device).
  • Terminating : The library is terminating.

Notify all sockets about process termination. To help with shutdown of multi-threaded programs nanomsg provides the terminate function which informs all the open sockets that process termination is underway. If a socket is blocked inside a blocking function, such as read, it will be unblocked and Terminating error will be returned to the user. Similarly, any subsequent attempt to invoke a socket function other than drop after terminate was called will result in Terminating error. If waiting inside a polling function, the call will unblock with both read and write signaled. The terminate function itself is non-blocking.

Specifies how long the socket should try to send pending outbound messages after drop have been called. Negative value means infinite linger. Default value is 1000 (1 second).

Size of the send buffer, in bytes. To prevent blocking for messages larger than the buffer, exactly one message may be buffered in addition to the data in the send buffer. Default value is 128kB.

Size of the receive buffer, in bytes. To prevent blocking for messages larger than the buffer, exactly one message may be buffered in addition to the data in the receive buffer. Default value is 128kB.

Maximum message size that can be received, in bytes. Negative value means that the received size is limited only by available addressable memory. Default is 1024kB.

The timeout for send operation on the socket. If message cannot be sent within the specified timeout, TryAgain error is returned. Negative value means infinite timeout. Default value is infinite timeout.

The timeout for recv operation on the socket. If message cannot be received within the specified timeout, TryAgain error is returned. Negative value means infinite timeout. Default value is infinite timeout.

For connection-based transports such as TCP, this option specifies how long to wait, when connection is broken before trying to re-establish it. Note that actual reconnect interval may be randomised to some extent to prevent severe reconnection storms. Default value is 100 milliseconds.

This option is to be used only in addition to set_reconnect_interval option. It specifies maximum reconnection interval. On each reconnect attempt, the previous interval is doubled until max_reconnect_interval is reached. Value of zero means that no exponential backoff is performed and reconnect interval is based only on reconnect_interval. If max_reconnect_interval is less than reconnect_interval, it is ignored. Default value is 0.

Sets outbound priority for endpoints subsequently added to the socket. This option has no effect on socket types that send messages to all the peers. However, if the socket type sends each message to a single peer (or a limited set of peers), peers with high priority take precedence over peers with low priority. Highest priority is 1, lowest priority is 16. Default value is 8.

Sets inbound priority for endpoints subsequently added to the socket. This option has no effect on socket types that are not able to receive messages. When receiving a message, messages from peer with higher priority are received before messages from peer with lower priority. Highest priority is 1, lowest priority is 16. Default value is 8.

If set to true, only IPv4 addresses are used. If set to false, both IPv4 and IPv6 addresses are used. Default value is true.

Socket name for error reporting and statistics. Default value is "socket.N" where N is socket integer. This option is experimental, see Socket::env for details

This option, when set to true, disables Nagle’s algorithm. It also disables delaying of TCP acknowledgments. Using this option improves latency at the expense of throughput.

Retrieve a file descriptor that is readable when a message can be received on the unerlying socket

Retrieve a file descriptor that is writeable when a message can be sent on the underlying socket

Retrieve the name for this socket for error reporting and statistics. **This option is experimental, see Socket::env for details

Defined on full Sub socket. Subscribes for a particular topic. A single Sub socket can handle multiple subscriptions.

Defined on full Sub socket. Unsubscribes from a particular topic.

Specifies how long to wait for responses to the survey. Once the deadline expires, receive function will return Timeout error and all subsequent responses to the survey will be silently dropped. The deadline is measured in milliseconds. Default value is 1 second.

This option is defined on the full Req socket. If reply is not received in specified amount of milliseconds, the request will be automatically resent. The type of this option is int. Default value is 1 minute.

Trait Implementations

impl Read for Socket
[src]

Receive a message from the socket and store it in the buffer argument. Any bytes exceeding the length specified by buffer.len() will be truncated. Returns the number of bytes of the message stored in the buffer on success. Please note that it differs from nanomsg's nn_recv which returns the msg size instead.

Example

use nanomsg::{Socket, Protocol};
use std::thread;
use std::io::{Read, Write};

let mut push_socket = Socket::new(Protocol::Push).unwrap();
let mut push_ep = push_socket.bind("ipc:///tmp/read_doc.ipc").unwrap();

let mut pull_socket = Socket::new(Protocol::Pull).unwrap();
let mut pull_ep = pull_socket.connect("ipc:///tmp/read_doc.ipc").unwrap();
let mut buffer = [0u8; 1024];

thread::sleep_ms(50);

match push_socket.write(b"foobar") {
    Ok(..) => println!("Message sent !"),
    Err(err) => panic!("Failed to write to the socket: {}", err)
}

match pull_socket.read(&mut buffer) {
    Ok(count) => {
        println!("Read {} bytes !", count);
        // here we can process the `count` bytes of the message stored in `buffer`
    },
    Err(err) => panic!("Problem while reading: {}", err)
};

Error

  • io::ErrorKind::FileNotFound : The socket is invalid.
  • io::ErrorKind::MismatchedFileTypeForOperation : The operation is not supported by this socket type.
  • io::ErrorKind::ResourceUnavailable : The operation cannot be performed on this socket at the moment because socket is not in the appropriate state. This error may occur with socket types that switch between several states.
  • io::ErrorKind::TimedOut : Individual socket types may define their own specific timeouts. If such timeout is hit this error will be returned.
  • io::ErrorKind::Interrupted : The operation was interrupted by delivery of a signal before the message was received.
  • io::ErrorKind::Other : The library is terminating.

Receive a message from the socket. Copy the message allocated by nanomsg into the buffer on success.

Example:

use nanomsg::{Socket, Protocol};
use std::thread;
use std::io::{Read, Write};

let mut push_socket = Socket::new(Protocol::Push).unwrap();
let mut push_ep = push_socket.bind("ipc:///tmp/read_to_end_doc.ipc").unwrap();

let mut pull_socket = Socket::new(Protocol::Pull).unwrap();
let mut pull_ep = pull_socket.connect("ipc:///tmp/read_to_end_doc.ipc").unwrap();

thread::sleep_ms(50);

match push_socket.write(b"foobar") {
    Ok(..) => println!("Message sent !"),
    Err(err) => panic!("Failed to write to the socket: {}", err)
}

let mut msg = Vec::new();
match pull_socket.read_to_end(&mut msg) {
    Ok(_) => {
        println!("Read {} bytes !", msg.len());
        // here we can process the the message stored in `msg`
    },
    Err(err) => panic!("Problem while reading: {}", err)
};

Error

  • io::ErrorKind::FileNotFound : The socket is invalid.
  • io::ErrorKind::MismatchedFileTypeForOperation : The operation is not supported by this socket type.
  • io::ErrorKind::ResourceUnavailable : The operation cannot be performed on this socket at the moment because socket is not in the appropriate state. This error may occur with socket types that switch between several states.
  • io::ErrorKind::TimedOut : Individual socket types may define their own specific timeouts. If such timeout is hit this error will be returned.
  • io::ErrorKind::Interrupted : The operation was interrupted by delivery of a signal before the message was received.
  • io::ErrorKind::Other : The library is terminating.

Receive a message from the socket. Copy the message allocated by nanomsg into the buffer on success. If the data in the message is not valid UTF-8 then an error is returned and buffer is unchanged.

Example:

use nanomsg::{Socket, Protocol};
use std::thread;
use std::io::{Read, Write};

let mut push_socket = Socket::new(Protocol::Push).unwrap();
let mut push_ep = push_socket.bind("ipc:///tmp/read_to_string_doc.ipc").unwrap();

let mut pull_socket = Socket::new(Protocol::Pull).unwrap();
let mut pull_ep = pull_socket.connect("ipc:///tmp/read_to_string_doc.ipc").unwrap();

thread::sleep_ms(50);

match push_socket.write(b"foobar") {
    Ok(..) => println!("Message sent !"),
    Err(err) => panic!("Failed to write to the socket: {}", err)
}

let mut msg = String::new();
match pull_socket.read_to_string(&mut msg) {
    Ok(_) => {
        println!("Read {} bytes !", msg.len());
        // here we can process the the message stored in `msg`
    },
    Err(err) => panic!("Problem while reading: {}", err)
};

Errors

  • io::ErrorKind::FileNotFound : The socket is invalid.
  • io::ErrorKind::MismatchedFileTypeForOperation : The operation is not supported by this socket type.
  • io::ErrorKind::ResourceUnavailable : The operation cannot be performed on this socket at the moment because socket is not in the appropriate state. This error may occur with socket types that switch between several states.
  • io::ErrorKind::TimedOut : Individual socket types may define their own specific timeouts. If such timeout is hit this error will be returned.
  • io::ErrorKind::Interrupted : The operation was interrupted by delivery of a signal before the message was received.
  • io::ErrorKind::Other : The library is terminating, or the message is not a valid UTF-8 string.

🔬 This is a nightly-only experimental API. (read_initializer)

Determines if this Reader can work with buffers of uninitialized memory. Read more

Read the exact number of bytes required to fill buf. Read more

Creates a "by reference" adaptor for this instance of Read. Read more

Transforms this Read instance to an [Iterator] over its bytes. Read more

Deprecated since 1.27.0

: Use str::from_utf8 instead: https://doc.rust-lang.org/nightly/std/str/struct.Utf8Error.html#examples

🔬 This is a nightly-only experimental API. (io)

the semantics of a partial read/write of where errors happen is currently unclear and may change

Transforms this Read instance to an [Iterator] over [char]s. Read more

Creates an adaptor which will chain this stream with another. Read more

Creates an adaptor which will read at most limit bytes from it. Read more

impl Write for Socket
[src]

The function will send a message containing the data from the buf parameter to the socket. Which of the peers the message will be sent to is determined by the particular socket type.

Example:

use nanomsg::{Socket, Protocol};
use std::thread;
use std::io::{Read, Write};

let mut push_socket = Socket::new(Protocol::Push).unwrap();
let mut push_ep = push_socket.bind("ipc:///tmp/write_doc.ipc").unwrap();

let mut pull_socket = Socket::new(Protocol::Pull).unwrap();
let mut pull_ep = pull_socket.connect("ipc:///tmp/write_doc.ipc").unwrap();
let mut buffer = [0u8; 1024];

thread::sleep_ms(50);

match push_socket.write_all(b"foobar") {
    Ok(..) => println!("Message sent !"),
    Err(err) => panic!("Failed to write to the socket: {}", err)
}

match pull_socket.read(&mut buffer) {
    Ok(count) => {
        println!("Read {} bytes !", count);
        // here we can process the `count` bytes of the message stored in `buffer`
    },
    Err(err) => panic!("Problem while reading: {}", err)
};

Error

  • io::ErrorKind::FileNotFound : The socket is invalid.
  • io::ErrorKind::MismatchedFileTypeForOperation : The operation is not supported by this socket type.
  • io::ErrorKind::ResourceUnavailable : The operation cannot be performed on this socket at the moment because socket is not in the appropriate state. This error may occur with socket types that switch between several states.
  • io::ErrorKind::Interrupted : The operation was interrupted by delivery of a signal before the message was received.
  • io::ErrorKind::TimedOut : Individual socket types may define their own specific timeouts. If such timeout is hit this error will be returned.
  • io::ErrorKind::Other : The library is terminating.

Flush this output stream, ensuring that all intermediately buffered contents reach their destination. Read more

Attempts to write an entire buffer into this write. Read more

Writes a formatted string into this writer, returning any error encountered. Read more

Creates a "by reference" adaptor for this instance of Write. Read more

impl Drop for Socket
[src]

Closes the socket. Any buffered inbound messages that were not yet received by the application will be discarded. The library will try to deliver any outstanding outbound messages for the time specified by set_linger. The call will block in the meantime.

Auto Trait Implementations

impl Send for Socket

impl Sync for Socket