liminal/protocol/frame.rs
1use super::{
2 causal::MessageId,
3 envelope::{MessageEnvelope, SchemaId},
4 error::ProtocolError,
5 version::ProtocolVersion,
6};
7
8/// Number of bytes in every serialized frame header.
9pub const HEADER_LEN: usize = 10;
10
11/// Frame-flag bit set on a [`Frame::ConversationMessage`] to request a correlated
12/// reply.
13///
14/// A client sets this bit on the request frame of a request-reply round trip. The
15/// server, after delivering the message to the conversation participant, drains
16/// the participant's reply and sends it back as a `ConversationMessage` carrying
17/// the same `conversation_id` (the correlation key) and this same flag bit. A
18/// `ConversationMessage` WITHOUT this bit keeps the pre-existing fire-and-forget
19/// semantics: the server stays silent on success. The bit travels in the frame
20/// header's `flags` byte, which the codec already round-trips, so no wire-format
21/// change is required.
22pub const CONVERSATION_REPLY_REQUESTED_FLAG: u8 = 0x01;
23
24/// Frame-flag bit set on a [`Frame::Publish`] to declare that the frame body
25/// carries a trailing idempotency-key string field (the dedup-on-delivery key).
26///
27/// A publisher sets this bit when it wants the server to consult its dedup cache
28/// keyed by the trailing idempotency key, delivering the message to subscribers
29/// AT MOST ONCE across re-publishes of the same key. A `Publish` frame WITHOUT
30/// this bit keeps the pre-existing wire layout EXACTLY: no trailing field is
31/// written and none is read, so a no-key publish is byte-identical to before.
32/// The bit travels in the frame header's `flags` byte, which the codec already
33/// round-trips, so no header-format change is required (the 13-L0 precedent).
34pub const PUBLISH_IDEMPOTENCY_KEY_FLAG: u8 = 0x02;
35
36/// Frame-flag bit set on a [`Frame::PublishAck`] to report a GENUINE delivery
37/// ack: the published message was accepted by at least one live subscriber on
38/// this publish.
39///
40/// This is distinct from the backpressure `Accept`/`Defer`/`Reject` signal: a
41/// `PublishAck` always means the publish was processed without error, but only a
42/// `PublishAck` carrying this bit means a subscriber actually received the
43/// message. An ack WITHOUT this bit means the publish succeeded but reached no
44/// subscriber (an empty channel, or a duplicate suppressed by dedup-on-delivery),
45/// so a caller that needs a true delivery ack can observe the difference. The bit
46/// rides the existing `flags` byte, so no wire-format change is required.
47pub const PUBLISH_DELIVERED_FLAG: u8 = 0x01;
48
49/// Status byte prefixing a [`Frame::WorkerRegisterAck`] payload that signals the
50/// registration was accepted (no further payload follows).
51pub(crate) const WORKER_REGISTER_ACK_ACCEPTED: u8 = 0x00;
52
53/// Status byte prefixing a [`Frame::WorkerRegisterAck`] payload that signals the
54/// registration was rejected (a length-prefixed reason string follows).
55pub(crate) const WORKER_REGISTER_ACK_REJECTED: u8 = 0x01;
56
57/// Protocol frame categories and their stable wire discriminants.
58#[derive(Clone, Copy, Debug, PartialEq, Eq)]
59pub enum FrameType {
60 /// Connection request.
61 Connect,
62 /// Successful connection response.
63 ConnectAck,
64 /// Failed connection response.
65 ConnectError,
66 /// Connection close notification.
67 Disconnect,
68 /// Channel subscription request.
69 Subscribe,
70 /// Successful subscription response.
71 SubscribeAck,
72 /// Failed subscription response.
73 SubscribeError,
74 /// Channel unsubscription request.
75 Unsubscribe,
76 /// Channel publish request.
77 Publish,
78 /// Successful publish response.
79 PublishAck,
80 /// Failed publish response.
81 PublishError,
82 /// Conversation lifecycle open.
83 ConversationOpen,
84 /// Conversation message delivery.
85 ConversationMessage,
86 /// Conversation lifecycle close.
87 ConversationClose,
88 /// Conversation processing error.
89 ConversationError,
90 /// In-band backpressure acceptance.
91 Accept,
92 /// In-band backpressure deferral.
93 Defer,
94 /// In-band backpressure rejection.
95 Reject,
96 /// Connection keepalive ping.
97 Ping,
98 /// Connection keepalive pong.
99 Pong,
100 /// Server-initiated push of an opaque payload to a connected client.
101 Push,
102 /// Client-initiated correlated reply to a server push.
103 PushReply,
104 /// Worker self-registration announcing identity and routing dimensions.
105 WorkerRegister,
106 /// Server acknowledgement of a worker registration (accepted or rejected).
107 WorkerRegisterAck,
108 /// Forward-compatible frame type not known to this implementation.
109 Unknown(u8),
110}
111
112impl FrameType {
113 /// Return true when this frame type must appear on stream 0.
114 #[must_use]
115 pub const fn is_control(self) -> bool {
116 matches!(
117 self,
118 Self::Connect
119 | Self::ConnectAck
120 | Self::ConnectError
121 | Self::Disconnect
122 | Self::Ping
123 | Self::Pong
124 | Self::WorkerRegister
125 | Self::WorkerRegisterAck
126 )
127 }
128}
129
130impl From<u8> for FrameType {
131 fn from(value: u8) -> Self {
132 match value {
133 0x01 => Self::Connect,
134 0x02 => Self::ConnectAck,
135 0x03 => Self::ConnectError,
136 0x04 => Self::Disconnect,
137 0x05 => Self::Subscribe,
138 0x06 => Self::SubscribeAck,
139 0x07 => Self::SubscribeError,
140 0x08 => Self::Unsubscribe,
141 0x09 => Self::Publish,
142 0x0A => Self::PublishAck,
143 0x0B => Self::PublishError,
144 0x0C => Self::ConversationOpen,
145 0x0D => Self::ConversationMessage,
146 0x0E => Self::ConversationClose,
147 0x0F => Self::ConversationError,
148 0x10 => Self::Accept,
149 0x11 => Self::Defer,
150 0x12 => Self::Reject,
151 0x13 => Self::Ping,
152 0x14 => Self::Pong,
153 0x15 => Self::Push,
154 0x16 => Self::PushReply,
155 0x17 => Self::WorkerRegister,
156 0x18 => Self::WorkerRegisterAck,
157 unknown => Self::Unknown(unknown),
158 }
159 }
160}
161
162impl From<FrameType> for u8 {
163 fn from(value: FrameType) -> Self {
164 match value {
165 FrameType::Connect => 0x01,
166 FrameType::ConnectAck => 0x02,
167 FrameType::ConnectError => 0x03,
168 FrameType::Disconnect => 0x04,
169 FrameType::Subscribe => 0x05,
170 FrameType::SubscribeAck => 0x06,
171 FrameType::SubscribeError => 0x07,
172 FrameType::Unsubscribe => 0x08,
173 FrameType::Publish => 0x09,
174 FrameType::PublishAck => 0x0A,
175 FrameType::PublishError => 0x0B,
176 FrameType::ConversationOpen => 0x0C,
177 FrameType::ConversationMessage => 0x0D,
178 FrameType::ConversationClose => 0x0E,
179 FrameType::ConversationError => 0x0F,
180 FrameType::Accept => 0x10,
181 FrameType::Defer => 0x11,
182 FrameType::Reject => 0x12,
183 FrameType::Ping => 0x13,
184 FrameType::Pong => 0x14,
185 FrameType::Push => 0x15,
186 FrameType::PushReply => 0x16,
187 FrameType::WorkerRegister => 0x17,
188 FrameType::WorkerRegisterAck => 0x18,
189 FrameType::Unknown(type_id) => type_id,
190 }
191 }
192}
193
194/// Fixed-size frame prefix: type, flags, stream identifier, and payload length.
195#[derive(Clone, Copy, Debug, PartialEq, Eq)]
196pub struct FrameHeader {
197 /// Frame type read from or written to byte 0.
198 pub frame_type: FrameType,
199 /// Frame flags read from or written to byte 1.
200 pub flags: u8,
201 /// Stream identifier read from or written to bytes 2..6.
202 pub stream_id: u32,
203 /// Payload length read from or written to bytes 6..10.
204 pub payload_length: u32,
205}
206
207impl FrameHeader {
208 /// Serialized header length in bytes.
209 pub const WIRE_LEN: usize = HEADER_LEN;
210}
211
212/// Self-describing worker registration carried by [`Frame::WorkerRegister`].
213///
214/// A worker announces the routing dimensions it serves plus a stable identity so
215/// the server can associate the worker with its connection and the application can
216/// route work to it. `node` is optional locality (the routing model treats node as
217/// an optional dimension); the codec encodes it with a presence byte rather than
218/// flattening `None` to an empty string.
219#[derive(Clone, Debug, PartialEq, Eq)]
220pub struct WorkerRegistration {
221 /// Namespaces this worker serves.
222 pub namespaces: Vec<String>,
223 /// Task queue this worker pulls from.
224 pub task_queue: String,
225 /// Optional node locality; `None` when the worker declares no node affinity.
226 pub node: Option<String>,
227 /// Activity types this worker can execute.
228 pub activity_types: Vec<String>,
229 /// Stable worker identity.
230 pub identity: String,
231}
232
233/// Outcome of a worker registration, carried by [`Frame::WorkerRegisterAck`].
234#[derive(Clone, Debug, PartialEq, Eq)]
235pub enum WorkerRegisterOutcome {
236 /// The server accepted the registration.
237 Accepted,
238 /// The server rejected the registration, with a human-readable reason.
239 Rejected {
240 /// Human-readable rejection reason surfaced to the worker.
241 reason: String,
242 },
243}
244
245/// A typed protocol frame body plus the header metadata required to encode it.
246#[derive(Clone, Debug, PartialEq, Eq)]
247pub enum Frame {
248 /// Connection request carrying a supported version range and opaque auth token.
249 Connect {
250 flags: u8,
251 min_version: ProtocolVersion,
252 max_version: ProtocolVersion,
253 auth_token: Vec<u8>,
254 },
255 /// Connection success carrying the negotiated protocol version and server capabilities.
256 ConnectAck {
257 flags: u8,
258 selected_version: ProtocolVersion,
259 capabilities: u32,
260 },
261 /// Connection failure carrying a numeric reason and optional message.
262 ConnectError {
263 flags: u8,
264 reason_code: u16,
265 message: Option<String>,
266 },
267 /// Connection close notification with no payload.
268 Disconnect { flags: u8 },
269 /// Channel subscription request carrying a channel and accepted schema hashes.
270 Subscribe {
271 flags: u8,
272 stream_id: u32,
273 channel: String,
274 accepted_schemas: Vec<SchemaId>,
275 max_in_flight: u32,
276 },
277 /// Channel subscription success carrying a subscription id and selected schema.
278 SubscribeAck {
279 flags: u8,
280 stream_id: u32,
281 subscription_id: u64,
282 selected_schema: SchemaId,
283 },
284 /// Channel subscription failure carrying a numeric reason and optional message.
285 SubscribeError {
286 flags: u8,
287 stream_id: u32,
288 reason_code: u16,
289 message: Option<String>,
290 },
291 /// Channel unsubscription request carrying the subscription id.
292 Unsubscribe {
293 flags: u8,
294 stream_id: u32,
295 subscription_id: u64,
296 },
297 /// Publish request carrying a channel and typed message envelope.
298 ///
299 /// `idempotency_key` is `Some` only when the [`PUBLISH_IDEMPOTENCY_KEY_FLAG`]
300 /// flag bit is set; it is the dedup-on-delivery key the server feeds to its
301 /// dedup cache. When `None` (and the flag clear) the frame is byte-identical
302 /// on the wire to a pre-13-L1 publish.
303 Publish {
304 flags: u8,
305 stream_id: u32,
306 channel: String,
307 envelope: MessageEnvelope,
308 idempotency_key: Option<String>,
309 },
310 /// Publish success carrying the accepted message id.
311 PublishAck {
312 flags: u8,
313 stream_id: u32,
314 message_id: u64,
315 },
316 /// Publish failure carrying a numeric reason and optional message.
317 PublishError {
318 flags: u8,
319 stream_id: u32,
320 reason_code: u16,
321 message: Option<String>,
322 },
323 /// Conversation open carrying a conversation id and subject.
324 ConversationOpen {
325 flags: u8,
326 stream_id: u32,
327 conversation_id: u64,
328 subject: String,
329 },
330 /// Conversation message carrying a conversation id and typed message envelope.
331 ConversationMessage {
332 flags: u8,
333 stream_id: u32,
334 conversation_id: u64,
335 envelope: MessageEnvelope,
336 },
337 /// Conversation close carrying a conversation id and optional reason.
338 ConversationClose {
339 flags: u8,
340 stream_id: u32,
341 conversation_id: u64,
342 reason_code: Option<u16>,
343 message: Option<String>,
344 },
345 /// Conversation failure carrying a conversation id, numeric reason, and optional message.
346 ConversationError {
347 flags: u8,
348 stream_id: u32,
349 conversation_id: u64,
350 reason_code: u16,
351 message: Option<String>,
352 },
353 /// Backpressure acceptance for a delivered message.
354 Accept {
355 flags: u8,
356 stream_id: u32,
357 referenced_message_id: MessageId,
358 },
359 /// Backpressure deferral for a buffered message.
360 Defer {
361 flags: u8,
362 stream_id: u32,
363 referenced_message_id: MessageId,
364 reason: Option<String>,
365 },
366 /// Backpressure rejection for a shed message.
367 Reject {
368 flags: u8,
369 stream_id: u32,
370 referenced_message_id: MessageId,
371 reason: Option<String>,
372 },
373 /// Connection keepalive ping.
374 Ping { flags: u8 },
375 /// Connection keepalive pong.
376 Pong { flags: u8 },
377 /// Server-initiated push carrying a correlation id and an opaque payload.
378 ///
379 /// A server writes this frame to a connected client over the client's existing
380 /// connection (server-to-client, the inverse of every other request frame). The
381 /// `correlation_id` is the key the server uses to match the client's later
382 /// [`Frame::PushReply`] back to this push; the `payload` is opaque application
383 /// bytes the server hands the client. This is an application-stream frame, so
384 /// `stream_id` is non-zero like a publish or conversation message.
385 Push {
386 flags: u8,
387 stream_id: u32,
388 correlation_id: u64,
389 payload: Vec<u8>,
390 },
391 /// Client-initiated correlated reply to a [`Frame::Push`].
392 ///
393 /// After handling a pushed frame the client writes this back on the same
394 /// connection, echoing the push's `correlation_id` so the server can match the
395 /// reply to the originating push. The `payload` is the client's opaque answer.
396 PushReply {
397 flags: u8,
398 stream_id: u32,
399 correlation_id: u64,
400 payload: Vec<u8>,
401 },
402 /// Worker self-registration over an established connection.
403 ///
404 /// A worker sends this control frame (stream 0) after the connection
405 /// handshake to announce its identity and routing dimensions. The server
406 /// associates the registration with the connection's process id and surfaces
407 /// it to the application via the connection-notifier hook, then answers with a
408 /// [`Frame::WorkerRegisterAck`]. `node` is optional locality and is encoded
409 /// with a presence byte, never flattened to an empty string.
410 WorkerRegister {
411 flags: u8,
412 registration: WorkerRegistration,
413 },
414 /// Server acknowledgement of a [`Frame::WorkerRegister`].
415 ///
416 /// Carries the registration outcome: [`WorkerRegisterOutcome::Accepted`] when
417 /// the server (and any configured notifier) accepted the worker, or
418 /// [`WorkerRegisterOutcome::Rejected`] carrying a human-readable reason when it
419 /// did not. The acknowledgement is synchronous so a worker never believes it is
420 /// registered when the application rejected it.
421 WorkerRegisterAck {
422 flags: u8,
423 outcome: WorkerRegisterOutcome,
424 },
425 /// Forward-compatible frame preserved after length-delimited skipping.
426 Unknown {
427 type_id: u8,
428 flags: u8,
429 stream_id: u32,
430 payload: Vec<u8>,
431 },
432}
433
434impl Frame {
435 /// Construct a ping frame, enforcing the stream-0 control-frame invariant.
436 ///
437 /// # Errors
438 ///
439 /// Returns [`ProtocolError::InvalidStream`] when `stream_id` is not zero.
440 pub fn new_ping(stream_id: u32) -> Result<Self, ProtocolError> {
441 validate_stream(FrameType::Ping, stream_id)?;
442 Ok(Self::Ping { flags: 0 })
443 }
444
445 /// Construct a publish frame, enforcing the non-zero application-stream invariant.
446 ///
447 /// # Errors
448 ///
449 /// Returns [`ProtocolError::InvalidStream`] when `stream_id` is zero.
450 pub fn new_publish(
451 stream_id: u32,
452 channel: impl Into<String>,
453 envelope: MessageEnvelope,
454 ) -> Result<Self, ProtocolError> {
455 validate_stream(FrameType::Publish, stream_id)?;
456 Ok(Self::Publish {
457 flags: 0,
458 stream_id,
459 channel: channel.into(),
460 envelope,
461 idempotency_key: None,
462 })
463 }
464
465 /// Construct a publish frame carrying an idempotency key for dedup-on-delivery.
466 ///
467 /// The returned frame has [`PUBLISH_IDEMPOTENCY_KEY_FLAG`] set and serializes
468 /// the trailing key field, so the server consults its dedup cache for this key.
469 ///
470 /// # Errors
471 ///
472 /// Returns [`ProtocolError::InvalidStream`] when `stream_id` is zero.
473 pub fn new_publish_with_idempotency_key(
474 stream_id: u32,
475 channel: impl Into<String>,
476 envelope: MessageEnvelope,
477 idempotency_key: impl Into<String>,
478 ) -> Result<Self, ProtocolError> {
479 validate_stream(FrameType::Publish, stream_id)?;
480 Ok(Self::Publish {
481 flags: PUBLISH_IDEMPOTENCY_KEY_FLAG,
482 stream_id,
483 channel: channel.into(),
484 envelope,
485 idempotency_key: Some(idempotency_key.into()),
486 })
487 }
488
489 /// Construct a server-to-client push frame, enforcing the non-zero
490 /// application-stream invariant.
491 ///
492 /// # Errors
493 ///
494 /// Returns [`ProtocolError::InvalidStream`] when `stream_id` is zero.
495 pub fn new_push(
496 stream_id: u32,
497 correlation_id: u64,
498 payload: Vec<u8>,
499 ) -> Result<Self, ProtocolError> {
500 validate_stream(FrameType::Push, stream_id)?;
501 Ok(Self::Push {
502 flags: 0,
503 stream_id,
504 correlation_id,
505 payload,
506 })
507 }
508
509 /// Construct a client-to-server push reply frame, echoing the correlation id of
510 /// the originating push, and enforcing the non-zero application-stream invariant.
511 ///
512 /// # Errors
513 ///
514 /// Returns [`ProtocolError::InvalidStream`] when `stream_id` is zero.
515 pub fn new_push_reply(
516 stream_id: u32,
517 correlation_id: u64,
518 payload: Vec<u8>,
519 ) -> Result<Self, ProtocolError> {
520 validate_stream(FrameType::PushReply, stream_id)?;
521 Ok(Self::PushReply {
522 flags: 0,
523 stream_id,
524 correlation_id,
525 payload,
526 })
527 }
528
529 /// Return the frame type represented by this frame body.
530 #[must_use]
531 pub const fn frame_type(&self) -> FrameType {
532 match self {
533 Self::Connect { .. } => FrameType::Connect,
534 Self::ConnectAck { .. } => FrameType::ConnectAck,
535 Self::ConnectError { .. } => FrameType::ConnectError,
536 Self::Disconnect { .. } => FrameType::Disconnect,
537 Self::Subscribe { .. } => FrameType::Subscribe,
538 Self::SubscribeAck { .. } => FrameType::SubscribeAck,
539 Self::SubscribeError { .. } => FrameType::SubscribeError,
540 Self::Unsubscribe { .. } => FrameType::Unsubscribe,
541 Self::Publish { .. } => FrameType::Publish,
542 Self::PublishAck { .. } => FrameType::PublishAck,
543 Self::PublishError { .. } => FrameType::PublishError,
544 Self::ConversationOpen { .. } => FrameType::ConversationOpen,
545 Self::ConversationMessage { .. } => FrameType::ConversationMessage,
546 Self::ConversationClose { .. } => FrameType::ConversationClose,
547 Self::ConversationError { .. } => FrameType::ConversationError,
548 Self::Accept { .. } => FrameType::Accept,
549 Self::Defer { .. } => FrameType::Defer,
550 Self::Reject { .. } => FrameType::Reject,
551 Self::Ping { .. } => FrameType::Ping,
552 Self::Pong { .. } => FrameType::Pong,
553 Self::Push { .. } => FrameType::Push,
554 Self::PushReply { .. } => FrameType::PushReply,
555 Self::WorkerRegister { .. } => FrameType::WorkerRegister,
556 Self::WorkerRegisterAck { .. } => FrameType::WorkerRegisterAck,
557 Self::Unknown { type_id, .. } => FrameType::Unknown(*type_id),
558 }
559 }
560
561 /// Return the frame flags stored in the fixed header.
562 #[must_use]
563 pub const fn flags(&self) -> u8 {
564 match self {
565 Self::Connect { flags, .. }
566 | Self::ConnectAck { flags, .. }
567 | Self::ConnectError { flags, .. }
568 | Self::Disconnect { flags, .. }
569 | Self::Subscribe { flags, .. }
570 | Self::SubscribeAck { flags, .. }
571 | Self::SubscribeError { flags, .. }
572 | Self::Unsubscribe { flags, .. }
573 | Self::Publish { flags, .. }
574 | Self::PublishAck { flags, .. }
575 | Self::PublishError { flags, .. }
576 | Self::ConversationOpen { flags, .. }
577 | Self::ConversationMessage { flags, .. }
578 | Self::ConversationClose { flags, .. }
579 | Self::ConversationError { flags, .. }
580 | Self::Accept { flags, .. }
581 | Self::Defer { flags, .. }
582 | Self::Reject { flags, .. }
583 | Self::Ping { flags }
584 | Self::Pong { flags }
585 | Self::Push { flags, .. }
586 | Self::PushReply { flags, .. }
587 | Self::WorkerRegister { flags, .. }
588 | Self::WorkerRegisterAck { flags, .. }
589 | Self::Unknown { flags, .. } => *flags,
590 }
591 }
592
593 /// Return the stream id stored in the fixed header.
594 #[must_use]
595 pub const fn stream_id(&self) -> u32 {
596 match self {
597 Self::Connect { .. }
598 | Self::ConnectAck { .. }
599 | Self::ConnectError { .. }
600 | Self::Disconnect { .. }
601 | Self::Ping { .. }
602 | Self::Pong { .. }
603 | Self::WorkerRegister { .. }
604 | Self::WorkerRegisterAck { .. } => 0,
605 Self::Subscribe { stream_id, .. }
606 | Self::SubscribeAck { stream_id, .. }
607 | Self::SubscribeError { stream_id, .. }
608 | Self::Unsubscribe { stream_id, .. }
609 | Self::Publish { stream_id, .. }
610 | Self::PublishAck { stream_id, .. }
611 | Self::PublishError { stream_id, .. }
612 | Self::ConversationOpen { stream_id, .. }
613 | Self::ConversationMessage { stream_id, .. }
614 | Self::ConversationClose { stream_id, .. }
615 | Self::ConversationError { stream_id, .. }
616 | Self::Accept { stream_id, .. }
617 | Self::Defer { stream_id, .. }
618 | Self::Reject { stream_id, .. }
619 | Self::Push { stream_id, .. }
620 | Self::PushReply { stream_id, .. }
621 | Self::Unknown { stream_id, .. } => *stream_id,
622 }
623 }
624
625 /// Validate the stream invariant for this frame.
626 pub(crate) fn validate(&self) -> Result<(), ProtocolError> {
627 validate_stream(self.frame_type(), self.stream_id())?;
628
629 if let Self::Subscribe { max_in_flight, .. } = self {
630 if *max_in_flight == 0 {
631 return Err(ProtocolError::codec(
632 "max_in_flight must be greater than zero",
633 ));
634 }
635 }
636
637 Ok(())
638 }
639}
640
641/// Validate stream placement for a known frame type.
642///
643/// # Errors
644///
645/// Returns [`ProtocolError::InvalidStream`] when `stream_id` is invalid for
646/// `frame_type`.
647pub fn validate_stream(frame_type: FrameType, stream_id: u32) -> Result<(), ProtocolError> {
648 if matches!(frame_type, FrameType::Unknown(_)) {
649 return Ok(());
650 }
651
652 let valid = if frame_type.is_control() {
653 stream_id == 0
654 } else {
655 stream_id >= 1
656 };
657
658 if valid {
659 Ok(())
660 } else {
661 Err(ProtocolError::invalid_stream(frame_type, stream_id))
662 }
663}
664
665#[cfg(test)]
666mod tests;