1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
#![allow(dead_code)]
use crate::message::Msg;
use crate::runtime::mailbox::MailboxSender as SessionCommandMailboxSender;
use crate::socket::connection_iface::ISocketConnection;
use crate::{error::ZmqError, Blob};
use std::fmt;
#[cfg(feature = "io-uring")]
use std::os::unix::io::RawFd;
use std::sync::Arc;
use fibre::mpmc::{AsyncReceiver, AsyncSender};
use fibre::oneshot;
use tokio::task::Id as TaskId;
/// Type identifier for different actors in the system.
/// Used in ActorStarted and ActorStopping events to categorize actors.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ActorType {
/// The main actor managing a socket's state and children (e.g., Listeners, Sessions).
SocketCore,
/// The command loop actor for a Listener (e.g., TCP or IPC listener).
Listener,
/// The task dedicated to accepting new connections for a Listener.
AcceptLoop,
/// The actor managing a ZMTP session over an established connection.
Session,
/// The task dedicated to reading messages from an inter-actor pipe,
/// specifically used for inproc connections.
PipeReader,
/// The task dedicated to establishing an outgoing connection (e.g., TCP or IPC connector).
Connecter,
/// The dedicated task within ContextInner managing the WaitGroup via events from the EventBus.
ContextListener,
}
/// Events broadcast system-wide or within a socket's actor tree via the EventBus.
/// These events are used for coordination and lifecycle management.
#[derive(Clone)]
pub enum SystemEvent {
/// Indicates the entire context is terminating. All actors should react by shutting down.
/// Published by `ContextInner::shutdown`.
ContextTerminating,
/// Indicates a specific socket (identified by `socket_id`) is closing.
/// Its child actors (Listeners, Sessions, etc.) should react by shutting down.
/// Published by `SocketCore` when its shutdown is initiated.
SocketClosing {
/// The unique handle ID of the `SocketCore` that is closing.
socket_id: usize,
},
/// Published by the spawner of an actor *after* the actor task is successfully launched.
/// This event is primarily used by the `ContextListener` to increment the `WaitGroup` count,
/// ensuring proper tracking of active actors for context termination.
ActorStarted {
/// The unique handle ID assigned to the newly started actor task.
handle_id: usize,
/// The type of the actor task (e.g., Session, Engine).
actor_type: ActorType,
/// The handle ID of the parent actor that spawned this one, if applicable.
/// `None` for top-level actors like `SocketCore` or `ContextListener`.
parent_id: Option<usize>,
},
/// Published by an actor task itself just before it terminates (either cleanly or due to an error).
/// This event is primarily used by the `ContextListener` to decrement the `WaitGroup` count.
ActorStopping {
/// The unique handle ID of the actor task that is stopping.
handle_id: usize,
/// The type of the actor task that is stopping.
actor_type: ActorType,
parent_id: Option<usize>,
/// Optional URI associated with the actor, e.g., for a Session or Listener.
endpoint_uri: Option<String>,
/// Optional error message string if the actor stopped due to an error.
error: Option<ZmqError>,
},
/// Published by a Listener's accept loop or a Connecter task when a new network
/// connection is fully established and its associated Session actor is ready.
/// The parent `SocketCore` (identified by `parent_core_id`) listens for this event.
NewConnectionEstablished {
/// The handle ID of the parent `SocketCore` that owns the Listener/Connecter.
parent_core_id: usize,
/// The actual network endpoint URI of the established connection (e.g., `tcp://<peer_ip>:<peer_port>`).
endpoint_uri: String,
/// The original target endpoint URI requested by the user for outgoing connections.
/// For listeners, this is usually the same as `endpoint_uri`.
target_endpoint_uri: String,
/// The actual interface SocketCore uses to send messages and close the connection.
connection_iface: Option<Arc<dyn ISocketConnection>>,
/// Describes the management model (Session actor or Uring FD).
/// SocketCore uses this to know *how* to expect incoming messages.
interaction_model: ConnectionInteractionModel,
/// A unique identifier for the spawned Session task (e.g., derived from `JoinHandle::id()`).
/// Used for tracking if needed, as `JoinHandle` itself is not `Clone`.
managing_actor_task_id: Option<TaskId>,
},
/// Published by a `SessionBase` actor after its `ZmtpEngineCore` completes the handshake
/// and establishes the peer's ZMTP identity.
/// The parent `SocketCore` listens for this event to update its pattern logic (e.g., ROUTER map).
PeerIdentityEstablished {
/// The handle ID of the parent `SocketCore` this session belongs to.
parent_core_id: usize,
/// The pipe ID from the `SocketCore`'s perspective (Core's read ID for this session's pipe).
/// This is the `pipe_write_id` given to the Session in `Command::AttachPipe`.
connection_identifier: usize,
/// The ZMTP identity of the peer, if established.
/// This comes from `ZmtpEngineConfig::routing_id` of the peer, sent in its READY command,
/// or potentially from a security mechanism.
peer_identity: Option<Blob>,
},
/// Published by a Connecter task when a connection attempt fails definitively
/// (e.g., after retries or due to a non-recoverable error).
/// The parent `SocketCore` (identified by `parent_core_id`) listens for this event.
ConnectionAttemptFailed {
/// The handle ID of the parent `SocketCore` that owns the Connecter.
parent_core_id: usize,
/// The target endpoint URI that the connection attempt was made to.
target_endpoint_uri: String,
/// The structured error that caused the connection failure.
error: ZmqError,
},
/// Published by an `inproc` connector's `SocketCore` to request a connection
/// from an `inproc` binder `SocketCore`. The binder listens for events
/// matching its `target_inproc_name`.
InprocBindingRequest {
/// The logical name of the inproc endpoint to connect to (e.g., "my-service").
target_inproc_name: String,
/// The URI of the connector socket, for logging or identification purposes.
connector_uri: String,
/// The channel sender the Binder uses to send messages TO the Connector.
binder_pipe_tx_to_connector: AsyncSender<Vec<Msg>>,
/// The channel receiver the Binder uses to get messages FROM the Connector.
binder_pipe_rx_from_connector: AsyncReceiver<Vec<Msg>>,
/// The ID the connector uses to write messages to the binder.
connector_pipe_write_id: usize,
/// The ID the connector uses to read messages from the binder.
connector_pipe_read_id: usize,
/// A oneshot sender for the binder to reply with `Ok(())` if the connection
/// is accepted, or `Err(ZmqError)` if rejected.
/// Note: `ZmqError` is used here as `oneshot::Sender` itself doesn't require the payload to be `Clone`.
reply_tx: oneshot::Sender<Result<(), ZmqError>>,
},
/// Published by an `inproc` connector's `SocketCore` when it closes its side
/// of an established inproc connection. This notifies the binder `SocketCore`
/// (identified by `target_inproc_name`) to clean up its corresponding pipe ends.
InprocPipePeerClosed {
/// The logical name of the inproc binder being notified.
target_inproc_name: String,
/// The pipe ID from the perspective of the *closing connector's read pipe*.
/// The binder uses this to identify which of its write pipes (to the connector)
/// should be closed and cleaned up.
closed_by_connector_pipe_read_id: usize,
},
}
impl fmt::Debug for SystemEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SystemEvent::ContextTerminating => write!(f, "ContextTerminating"),
SystemEvent::SocketClosing { socket_id } => f
.debug_struct("SocketClosing")
.field("socket_id", socket_id)
.finish(),
SystemEvent::ActorStarted {
handle_id,
actor_type,
parent_id,
} => f
.debug_struct("ActorStarted")
.field("handle_id", handle_id)
.field("actor_type", actor_type)
.field("parent_id", parent_id)
.finish(),
SystemEvent::ActorStopping {
handle_id,
actor_type,
endpoint_uri,
parent_id,
error,
} => f
.debug_struct("ActorStopping")
.field("handle_id", handle_id)
.field("actor_type", actor_type)
.field("parent_id", parent_id)
.field("endpoint_uri", endpoint_uri)
.field("error", error)
.finish(),
SystemEvent::NewConnectionEstablished {
parent_core_id,
endpoint_uri,
target_endpoint_uri,
connection_iface, // Will use ISocketConnection's Debug impl
interaction_model,
managing_actor_task_id,
} => f
.debug_struct("NewConnectionEstablished")
.field("parent_core_id", parent_core_id)
.field("endpoint_uri", endpoint_uri)
.field("target_endpoint_uri", target_endpoint_uri)
.field("connection_iface_is_some", &connection_iface.is_some())
.field("interaction_model", interaction_model)
.field("managing_actor_task_id", managing_actor_task_id)
.finish(),
SystemEvent::PeerIdentityEstablished {
parent_core_id,
connection_identifier,
peer_identity,
} => f
.debug_struct("PeerIdentityEstablished")
.field("parent_core_id", parent_core_id)
.field("connection_identifier", connection_identifier)
.field("peer_identity", peer_identity)
.finish(),
SystemEvent::ConnectionAttemptFailed {
parent_core_id,
target_endpoint_uri,
error,
} => f
.debug_struct("ConnectionAttemptFailed")
.field("parent_core_id", parent_core_id)
.field("target_endpoint_uri", target_endpoint_uri)
.field("error", error)
.finish(),
SystemEvent::InprocBindingRequest { .. } => f
.debug_struct("InprocBindingRequest")
.finish_non_exhaustive(),
SystemEvent::InprocPipePeerClosed { .. } => f
.debug_struct("InprocPipePeerClosed")
.finish_non_exhaustive(),
}
}
}
// This enum describes how SocketCore interacts with an established connection.
#[derive(Clone)] // ISocketConnection is Arc'd, RawFd is Copy, MailboxSender is Clone
pub enum ConnectionInteractionModel {
ViaSca {
// SCA = SessionConnectionActor
sca_mailbox: SessionCommandMailboxSender,
sca_handle_id: usize,
},
/// Connection is managed directly by the UringWorker using a RawFd.
#[cfg(feature = "io-uring")]
ViaUringFd {
fd: RawFd,
// SocketCore will get the UringWorker's op_tx from Context to send data/commands.
},
#[cfg(not(feature = "io-uring"))]
ViaUringFd { _fd_placeholder: () }, // Ensure struct is valid if feature disabled
}
impl fmt::Debug for ConnectionInteractionModel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ConnectionInteractionModel::ViaSca {
sca_mailbox,
sca_handle_id,
} => f
.debug_struct("ViaSca")
.field("sca_mailbox_closed", &sca_mailbox.is_closed())
.field("sca_handle_id", sca_handle_id)
.finish(),
#[cfg(feature = "io-uring")]
ConnectionInteractionModel::ViaUringFd { fd } => {
f.debug_struct("ViaUringFd").field("fd", fd).finish()
}
#[cfg(not(feature = "io-uring"))]
ConnectionInteractionModel::ViaUringFd { _fd_placeholder } => f
.debug_struct("ViaUringFd")
.field("_fd_placeholder", &())
.finish(),
}
}
}