1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//! Pubsub serving utils for [`Router`]s.
//!
//! This module provides pubsub functionality for serving [`Router`]s over
//! various connection types. Built-in support is provided for IPC and
//! Websockets, and a trait system is provided for custom connection types.
//!
//! ## Overview
//!
//! The pubsub module provides a way to serve a [`Router`] over pubsub
//! connections like IPC and Websockets.
//!
//! ## Usage
//!
//! Typically users want to use a [`Connect`] implementor to create a server
//! using [`Connect::serve`]. This will create a [`Listener`]. [`Listener`]s
//! manage accepting client connections, and spawn tasks to route requests for
//! each connection.
//!
//! ### Advanced Usage
//!
//! ## Backpressure and buffer saturation
//!
//! Backpressure is managed by a notification buffer. This buffer is allocated
//! on a per-client basis, and contains both notifications, and responses. The
//! buffer is filled when the client is not reading from the connection, and
//! the server will stop processing requests from that client until the buffer
//! has room.
//!
//! The buffer size is set by the [`Connect::notification_buffer_size`] method,
//! and defaults to [`DEFAULT_NOTIFICATION_BUFFER_PER_CLIENT`]. It is
//! recommended to set this value based on the expected size of notifications
//! and responses in your RPC server, as setting it too high may lead to server
//! resource exhaustion.
//!
//! The server contains two tasks per connection: a `RouteTask` and a
//! `WriteTask`. The `WriteTask` is responsible for notifying clients by pulling
//! items from the notification buffer, and writing them to the connection. The
//! `RouteTask` is responsible for reading requests from the connection, and
//! spawning tasks to handle each request. The `RouteTask` will always invoke
//! [`Sender::reserve_owned`] before invoking a [`Handler`]. This ensures that
//! the server will not process requests if the notification buffer is full.
//!
//! When [`Handler`]s send notifications via the [`HandlerCtx`], they SHOULD
//! `await` on [`Sender::send`]. This queues them for access to the notification
//! buffer. If the buffer is full, the `await` will backpressure the handler
//! until the buffer has room. However, if many handlers are consuming the
//! buffer, it will ALSO backpressure the `RouteTask` from reading requests from
//! the connection. [`Handler`] implementers should be aware of this.
//!
//! #### Custom Connector
//!
//! [`Connect`] has been implemented for
//! [`interprocess::local_socket::ListenerOptions`] (producing a
//! [`interprocess::local_socket::tokio::Listener`]),
//! and for [`SocketAddr`] (producing a [`TcpListener`]).
//!
//! Custom [`Connect`] implementors can configure the listener in any way they
//! need. This is useful for (e.g.) configuring network or security policies on
//! the inbound TLS connection.
//!
//! #### Custom Listener
//!
//! If you need more control over the server, you can create your own
//! [`Listener`]. This is useful if you need to customize the connection
//! handling, or if you need to use a different connection type.
//!
//! [`Listener`]'s associated stream and sink types are used to read requests
//! and write responses. These types must implement [`JsonReqStream`] and
//! [`JsonSink`] respectively.
//!
//! ## Internal Structure
//!
//! There are 3 tasks:x
//! - `ListenerTask` - listens for new connections, accepts, and spawns
//! `RouteTask` and `WriteTask` for each. A listener task is spawned for each
//! style of connection (e.g. IPC or Websockets).
//! - `RouteTask` - Reads requests from an inbound connection, and spawns a
//! tokio task for each request. There is 1 `RouteTask` per connection.
//! - `WriteTask` - Manages outbound connections, receives responses from the
//! router, and writes responses to the relevant connection. There is 1
//! `WriteTask` per connection.
//!
//! [`Router`]: crate::Router
//! [`SocketAddr`]: std::net::SocketAddr
//! [`TcpListener`]: tokio::net::TcpListener
//! [`Sender::reserve_owned`]: tokio::sync::mpsc::Sender::reserve_owned
//! [`Sender::send`]: tokio::sync::mpsc::Sender::send
//! [`Handler`]: crate::Handler
//! [`HandlerCtx`]: crate::HandlerCtx
// Re-exported for use in tests
pub use ReadJsonStream;
/// IPC support via interprocess local sockets.
pub use WriteItem;
pub use ;
pub use ServerShutdown;
pub use r#trait::;
pub use ;