Crate tk_listen [] [src]

A library that allows to listen network sockets with proper resource limits and error handling.

Library constists of two simple combinators:

  • sleep_on_error -- filters Stream of accepted sockets for errors. Simple errors like ConnectionReset are just ignored. Severe errors like Too many files open will delay next accept() call for the delay specified, effectively allowing other connections to be processed and release resources for new ones. Replaces code like this.
  • listen -- iterates over a stream using buffer_unordered combinator. It also suppresses errors in futures (because otherwise every connection error would shut down the whole stream). And returns ForEach-like future, you can run() or combine with other futures. Stands for code like this.

Example

Simple example looks like this:

let TIME_TO WAIT_ON_ERROR = Duration::from_millis(100);
  let MAX_SIMULTANEOUS_CONNECTIONS = 1000;

  let mut lp = Core::new().unwrap();
  let listener = TcpListener::bind(&addr, &lp.handle()).unwrap();
  lp.run(
      listener.incoming()
      .sleep_on_error(TIME_TO_WAIT_ON_ERROR, &h2)
      .map(move |(mut socket, _addr)| {
           // Your future is here:
           Proto::new(socket)
           // Errors should not pass silently
           // common idea is to log them
           .map_err(|e| error!("Protocol error: {}", e))
      })
      .listen(MAX_SIMULTANEOUS_CONNECTIONS)
  ).unwrap(); // stream doesn't end in this case

Example With Listener Shutdown

Because tk-listen works as a combinator trait, you can easily add things, like shutdown:

let (tx, rx) = oneshot::channel();
  lp.run(
      listener.incoming()
      .sleep_on_error(TIME_TO_WAIT_ON_ERROR, &h2)
      .map(move |(mut socket, _addr)| {
           // Your future is here:
           Proto::new(socket)
           // Errors should not pass silently
           // common Idea is to log them
           .map_err(|e| error!("Protocol error: {}", e))
      })
      .listen(MAX_SIMULTANEOUS_CONNECTIONS)
      .select(|_| rx)
  )

Now listener will be shut down either when tx is dropped or when you send a message via tx.

This is a "force shutdown", meaning it will close all active connections immediately. It's also possible to stop accepting by closing original stream (e.g. using take_while) and wait until all connections shutdown gracefully.

Structs

Listen

A structure returned by ListenExt::listen

SleepOnError

A structure returned by ListenExt::sleep_on_error

Traits

ListenExt

An extension trait that provides necessary combinators for turning a stream of accept() events into a full-featured connection listener