sansio/handler.rs
1//! # Handler Trait - Pipeline-Based Protocol Processing
2//!
3//! The [`Handler`] trait is the core building block for constructing protocol pipelines in sansio.
4//! Handlers process messages flowing through a pipeline, transforming them from one type to another
5//! while maintaining strict type safety.
6//!
7//! ## Overview
8//!
9//! A handler is a bidirectional message processor that sits in a pipeline:
10//! - **Inbound direction**: Receives `Rin`, processes it, produces `Rout` for the next handler
11//! - **Outbound direction**: Receives `Win`, processes it, produces `Wout` for the previous handler
12//!
13//! Each handler has a [`Context`] that allows it to:
14//! - Forward messages to the next handler in the pipeline
15//! - Poll messages from the next handler
16//! - Propagate events (timeouts, errors, EOF, etc.)
17//!
18//! ## Type Parameters
19//!
20//! Handlers have four associated types that define the transformation:
21//!
22//! - `Rin`: **R**ead **in**put - What this handler receives from the previous handler (inbound)
23//! - `Rout`: **R**ead **out**put - What this handler produces for the next handler (inbound)
24//! - `Win`: **W**rite **in**put - What this handler receives from the next handler (outbound)
25//! - `Wout`: **W**rite **out**put - What this handler produces for the previous handler (outbound)
26//!
27//! ## Handler Chain Type Flow
28//!
29//! ```text
30//! Pipeline: Handler1 -> Handler2 -> Handler3
31//!
32//! Inbound (handle_read):
33//! Handler1: Rin1 -> Rout1
34//! Handler2: Rin2 (= Rout1) -> Rout2
35//! Handler3: Rin3 (= Rout2) -> Rout3
36//!
37//! Outbound (poll_write):
38//! Handler3: Win3 -> Wout3
39//! Handler2: Win2 (= Wout3) -> Wout2
40//! Handler1: Win1 (= Wout2) -> Wout1
41//! ```
42//!
43//! ## Example: Echo Handler
44//!
45//! ```rust
46//! use sansio::{Handler, Context};
47//! use std::collections::VecDeque;
48//!
49//! /// Echo handler that receives strings and queues them to be sent back
50//! struct EchoHandler {
51//! output_queue: VecDeque<String>,
52//! }
53//!
54//! impl EchoHandler {
55//! fn new() -> Self {
56//! Self {
57//! output_queue: VecDeque::new(),
58//! }
59//! }
60//! }
61//!
62//! impl Handler for EchoHandler {
63//! // Input type: String
64//! type Rin = String;
65//! // Output type: Same as input (passthrough for inbound)
66//! type Rout = String;
67//! // Write input type: String
68//! type Win = String;
69//! // Write output type: String to send back
70//! type Wout = String;
71//!
72//! fn name(&self) -> &str {
73//! "EchoHandler"
74//! }
75//!
76//! fn handle_read(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, msg: Self::Rin) {
77//! println!("Echoing: {}", msg);
78//! // Queue the message to be sent back
79//! self.output_queue.push_back(msg.clone());
80//! // Also forward to next handler (if any)
81//! ctx.fire_handle_read(msg);
82//! }
83//!
84//! fn poll_write(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> {
85//! // First check if next handler has data
86//! if let Some(msg) = ctx.fire_poll_write() {
87//! return Some(msg);
88//! }
89//! // Then return our queued data
90//! self.output_queue.pop_front()
91//! }
92//! }
93//! ```
94//!
95//! ## Example: Protocol Codec Handler
96//!
97//! ```rust
98//! use sansio::{Handler, Context};
99//! use std::collections::VecDeque;
100//!
101//! # #[derive(Debug, Clone)]
102//! # struct Message { data: String }
103//!
104//! /// Decodes bytes into messages, encodes messages into bytes
105//! struct ProtocolCodec {
106//! read_buffer: Vec<u8>,
107//! write_queue: VecDeque<Vec<u8>>,
108//! }
109//!
110//! impl Handler for ProtocolCodec {
111//! // Inbound: receive bytes, produce Messages
112//! type Rin = Vec<u8>;
113//! type Rout = Message;
114//! // Outbound: receive Messages, produce bytes
115//! type Win = Message;
116//! type Wout = Vec<u8>;
117//!
118//! fn name(&self) -> &str {
119//! "ProtocolCodec"
120//! }
121//!
122//! fn handle_read(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, msg: Self::Rin) {
123//! // Decode bytes into message
124//! self.read_buffer.extend_from_slice(&msg);
125//!
126//! // Simple example: assume we have a complete message
127//! if let Ok(s) = String::from_utf8(self.read_buffer.clone()) {
128//! let message = Message { data: s };
129//! self.read_buffer.clear();
130//! // Forward decoded message to next handler
131//! ctx.fire_handle_read(message);
132//! }
133//! }
134//!
135//! fn poll_write(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> {
136//! // Poll for messages from next handler
137//! if let Some(msg) = ctx.fire_poll_write() {
138//! // Encode message to bytes
139//! self.write_queue.push_back(msg.data.into_bytes());
140//! }
141//! // Return encoded bytes
142//! self.write_queue.pop_front()
143//! }
144//! }
145//! ```
146//!
147//! ## Event Handling
148//!
149//! Handlers can also process lifecycle and error events:
150//!
151//! - **`transport_active`**: Connection established
152//! - **`transport_inactive`**: Connection closed
153//! - **`handle_timeout`**: Periodic timeout for time-based operations
154//! - **`handle_error`**: Error occurred in the pipeline
155//! - **`handle_eof`**: End-of-file/stream reached
156//! - **`handle_close`**: Explicit close request
157//!
158//! ## Context API
159//!
160//! The [`Context`] object provides methods to interact with the pipeline:
161//!
162//! - **`fire_handle_read(msg)`**: Forward a processed inbound message
163//! - **`fire_poll_write()`**: Poll for outbound messages from next handler
164//! - **`fire_transport_active()`**: Propagate transport active event
165//! - **`fire_handle_timeout(now)`**: Propagate timeout event
166//! - **`fire_handle_error(err)`**: Propagate error event
167//!
168//! ## Best Practices
169//!
170//! 1. **Single Responsibility**: Each handler should do one thing well
171//! 2. **Type Transformation**: Use type parameters to document transformations
172//! 3. **Event Propagation**: Always call `ctx.fire_*` to propagate events unless you explicitly want to stop them
173//! 4. **Error Handling**: Convert errors into appropriate types or propagate via `fire_handle_error`
174//! 5. **Buffering**: Use internal queues when transformations are one-to-many or many-to-one
175//!
176//! ## Comparison with Protocol Trait
177//!
178//! | Feature | Handler | Protocol |
179//! |---------|---------|----------|
180//! | Complexity | More complex | Simple |
181//! | Context | Has context object | No context |
182//! | Composition | Pipeline-based | Manual |
183//! | Best for | Protocol stacks | Single protocols |
184//!
185//! Use `Handler` when building complex protocol pipelines with multiple layers.
186//! Use [`Protocol`](crate::Protocol) when you want a simple, self-contained protocol.
187
188use crate::handler_internal::{ContextInternal, HandlerInternal};
189use log::{trace, warn};
190use std::any::Any;
191use std::cell::RefCell;
192use std::marker::PhantomData;
193use std::rc::Rc;
194use std::{error::Error, time::Instant};
195
196/// A handler processes messages flowing through a pipeline.
197///
198/// The `Handler` trait defines bidirectional message processing with strict type safety.
199/// Each handler transforms messages from one type to another, enabling complex protocol
200/// stacks to be built from simple, composable components.
201///
202/// # Type Parameters
203///
204/// - `Rin`: Read input - what this handler receives (inbound direction)
205/// - `Rout`: Read output - what this handler produces (inbound direction)
206/// - `Win`: Write input - what this handler receives (outbound direction)
207/// - `Wout`: Write output - what this handler produces (outbound direction)
208///
209/// # Design Pattern
210///
211/// Handlers follow the Intercepting Filter pattern:
212/// 1. Receive messages from one direction
213/// 2. Transform the message (decode, encode, validate, etc.)
214/// 3. Forward to the next handler in the pipeline
215/// 4. Optionally handle events (timeouts, errors, lifecycle)
216///
217/// # Example
218///
219/// See the [module-level documentation](index.html) for complete examples.
220pub trait Handler {
221 /// Read input message type.
222 ///
223 /// This is the type of messages this handler receives from the previous handler
224 /// in the pipeline (inbound direction).
225 type Rin: 'static;
226
227 /// Read output message type.
228 ///
229 /// This is the type of messages this handler produces for the next handler
230 /// in the pipeline (inbound direction).
231 type Rout: 'static;
232
233 /// Write input message type.
234 ///
235 /// This is the type of messages this handler receives from the next handler
236 /// in the pipeline (outbound direction).
237 type Win: 'static;
238
239 /// Write output message type.
240 ///
241 /// This is the type of messages this handler produces for the previous handler
242 /// in the pipeline (outbound direction).
243 type Wout: 'static;
244
245 /// Returns the handler's name.
246 ///
247 /// Used for debugging, logging, and error messages. Should be unique
248 /// within a pipeline to help identify which handler is being referenced.
249 ///
250 /// # Example
251 ///
252 /// ```rust
253 /// # use sansio::Handler;
254 /// # struct MyHandler;
255 /// # impl Handler for MyHandler {
256 /// # type Rin = String;
257 /// # type Rout = String;
258 /// # type Win = String;
259 /// # type Wout = String;
260 /// fn name(&self) -> &str {
261 /// "MyProtocolHandler"
262 /// }
263 /// # fn handle_read(&mut self, _ctx: &sansio::Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, _msg: Self::Rin) {}
264 /// # fn poll_write(&mut self, _ctx: &sansio::Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> { None }
265 /// # }
266 /// ```
267 fn name(&self) -> &str;
268
269 #[doc(hidden)]
270 #[allow(clippy::type_complexity)]
271 fn generate(
272 self,
273 ) -> (
274 String,
275 Rc<RefCell<dyn HandlerInternal>>,
276 Rc<RefCell<dyn ContextInternal>>,
277 )
278 where
279 Self: Sized + 'static,
280 {
281 let handler_name = self.name().to_owned();
282 let context: Context<Self::Rin, Self::Rout, Self::Win, Self::Wout> =
283 Context::new(self.name());
284
285 let handler: Box<
286 dyn Handler<Rin = Self::Rin, Rout = Self::Rout, Win = Self::Win, Wout = Self::Wout>,
287 > = Box::new(self);
288
289 (
290 handler_name,
291 Rc::new(RefCell::new(handler)),
292 Rc::new(RefCell::new(context)),
293 )
294 }
295
296 /// Called when the transport becomes active (connected).
297 ///
298 /// This event is triggered when a connection is established. Handlers can use this
299 /// to initialize state, start timers, or perform handshakes.
300 ///
301 /// The default implementation propagates the event to the next handler.
302 ///
303 /// # Parameters
304 ///
305 /// - `ctx`: Context for interacting with the pipeline
306 ///
307 /// # Example
308 ///
309 /// ```rust
310 /// # use sansio::{Handler, Context};
311 /// # struct MyHandler { connected: bool }
312 /// # impl Handler for MyHandler {
313 /// # type Rin = String;
314 /// # type Rout = String;
315 /// # type Win = String;
316 /// # type Wout = String;
317 /// # fn name(&self) -> &str { "MyHandler" }
318 /// fn transport_active(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
319 /// self.connected = true;
320 /// println!("Connection established!");
321 /// // Propagate to next handler
322 /// ctx.fire_transport_active();
323 /// }
324 /// # fn handle_read(&mut self, _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, _msg: Self::Rin) {}
325 /// # fn poll_write(&mut self, _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> { None }
326 /// # }
327 /// ```
328 fn transport_active(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
329 ctx.fire_transport_active();
330 }
331
332 /// Called when the transport becomes inactive (disconnected).
333 ///
334 /// This event is triggered when a connection is closed. Handlers can use this
335 /// to clean up resources, cancel timers, or save state.
336 ///
337 /// The default implementation propagates the event to the next handler.
338 ///
339 /// # Parameters
340 ///
341 /// - `ctx`: Context for interacting with the pipeline
342 fn transport_inactive(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
343 ctx.fire_transport_inactive();
344 }
345
346 /// Handles an inbound message.
347 ///
348 /// This is the primary method for processing incoming data. The handler receives
349 /// a message of type `Rin`, processes it, and can forward transformed messages
350 /// of type `Rout` to the next handler via `ctx.fire_handle_read(msg)`.
351 ///
352 /// # Parameters
353 ///
354 /// - `ctx`: Context for interacting with the pipeline
355 /// - `msg`: The incoming message to process
356 ///
357 /// # Example
358 ///
359 /// ```rust
360 /// # use sansio::{Handler, Context};
361 /// # struct ToUpperHandler;
362 /// # impl Handler for ToUpperHandler {
363 /// # type Rin = String;
364 /// # type Rout = String;
365 /// # type Win = String;
366 /// # type Wout = String;
367 /// # fn name(&self) -> &str { "ToUpperHandler" }
368 /// fn handle_read(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, msg: Self::Rin) {
369 /// // Transform the message
370 /// let transformed = msg.to_uppercase();
371 /// // Forward to next handler
372 /// ctx.fire_handle_read(transformed);
373 /// }
374 /// # fn poll_write(&mut self, _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> { None }
375 /// # }
376 /// ```
377 fn handle_read(
378 &mut self,
379 ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
380 msg: Self::Rin,
381 );
382
383 /// Polls for an outbound message.
384 ///
385 /// This method is called to retrieve messages that should be written to the transport.
386 /// Handlers typically:
387 /// 1. Poll the next handler via `ctx.fire_poll_write()`
388 /// 2. Transform received messages or generate their own
389 /// 3. Return messages of type `Wout`
390 ///
391 /// Returns `None` when no message is available.
392 ///
393 /// # Parameters
394 ///
395 /// - `ctx`: Context for interacting with the pipeline
396 ///
397 /// # Returns
398 ///
399 /// - `Some(Wout)`: A message ready to be written
400 /// - `None`: No message available
401 ///
402 /// # Example
403 ///
404 /// ```rust
405 /// # use sansio::{Handler, Context};
406 /// # use std::collections::VecDeque;
407 /// # struct BufferingHandler { queue: VecDeque<String> }
408 /// # impl Handler for BufferingHandler {
409 /// # type Rin = String;
410 /// # type Rout = String;
411 /// # type Win = String;
412 /// # type Wout = String;
413 /// # fn name(&self) -> &str { "BufferingHandler" }
414 /// fn poll_write(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> {
415 /// // First check next handler
416 /// if let Some(msg) = ctx.fire_poll_write() {
417 /// return Some(msg);
418 /// }
419 /// // Then check our own queue
420 /// self.queue.pop_front()
421 /// }
422 /// # fn handle_read(&mut self, _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, _msg: Self::Rin) {}
423 /// # }
424 /// ```
425 fn poll_write(
426 &mut self,
427 ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
428 ) -> Option<Self::Wout>;
429
430 /// Handles a timeout event.
431 ///
432 /// Called periodically to allow handlers to perform time-based operations like:
433 /// - Sending keepalive messages
434 /// - Timing out pending operations
435 /// - Retransmitting lost packets
436 ///
437 /// The default implementation propagates the event to the next handler.
438 ///
439 /// # Parameters
440 ///
441 /// - `ctx`: Context for interacting with the pipeline
442 /// - `now`: The current timestamp
443 fn handle_timeout(
444 &mut self,
445 ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
446 now: Instant,
447 ) {
448 ctx.fire_handle_timeout(now);
449 }
450
451 /// Polls for the next timeout deadline.
452 ///
453 /// Handlers can update `eto` (earliest timeout) to indicate when they next
454 /// need `handle_timeout` to be called. The pipeline will call the handler
455 /// with the minimum timeout across all handlers.
456 ///
457 /// The default implementation propagates the poll to the next handler.
458 ///
459 /// # Parameters
460 ///
461 /// - `ctx`: Context for interacting with the pipeline
462 /// - `eto`: Mutable reference to the earliest timeout. Update this to request an earlier timeout.
463 ///
464 /// # Example
465 ///
466 /// ```rust
467 /// # use sansio::{Handler, Context};
468 /// # use std::time::{Instant, Duration};
469 /// # struct KeepaliveHandler { next_keepalive: Option<Instant> }
470 /// # impl Handler for KeepaliveHandler {
471 /// # type Rin = String;
472 /// # type Rout = String;
473 /// # type Win = String;
474 /// # type Wout = String;
475 /// # fn name(&self) -> &str { "KeepaliveHandler" }
476 /// fn poll_timeout(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, eto: &mut Instant) {
477 /// // Request timeout for our keepalive
478 /// if let Some(deadline) = self.next_keepalive {
479 /// if deadline < *eto {
480 /// *eto = deadline;
481 /// }
482 /// }
483 /// // Also check next handler
484 /// ctx.fire_poll_timeout(eto);
485 /// }
486 /// # fn handle_read(&mut self, _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, _msg: Self::Rin) {}
487 /// # fn poll_write(&mut self, _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> { None }
488 /// # }
489 /// ```
490 fn poll_timeout(
491 &mut self,
492 ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
493 eto: &mut Instant,
494 ) {
495 ctx.fire_poll_timeout(eto);
496 }
497
498 /// Handles an end-of-file (EOF) event.
499 ///
500 /// Called when the input stream reaches EOF (e.g., remote peer closed their write side).
501 /// Handlers can use this to flush buffers or initiate graceful shutdown.
502 ///
503 /// The default implementation propagates the event to the next handler.
504 ///
505 /// # Parameters
506 ///
507 /// - `ctx`: Context for interacting with the pipeline
508 fn handle_eof(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
509 ctx.fire_handle_eof();
510 }
511
512 /// Handles an error event.
513 ///
514 /// Called when an error occurs in the pipeline. Handlers can:
515 /// - Log the error
516 /// - Attempt recovery
517 /// - Transform the error
518 /// - Propagate it to the next handler
519 ///
520 /// The default implementation propagates the error to the next handler.
521 ///
522 /// # Parameters
523 ///
524 /// - `ctx`: Context for interacting with the pipeline
525 /// - `err`: The error that occurred
526 fn handle_error(
527 &mut self,
528 ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
529 err: Box<dyn Error>,
530 ) {
531 ctx.fire_handle_error(err);
532 }
533
534 /// Handles a close event.
535 ///
536 /// Called when the pipeline is being closed. Handlers should clean up resources,
537 /// flush buffers, and prepare for shutdown.
538 ///
539 /// The default implementation propagates the event to the next handler.
540 ///
541 /// # Parameters
542 ///
543 /// - `ctx`: Context for interacting with the pipeline
544 fn handle_close(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
545 ctx.fire_handle_close();
546 }
547}
548
549impl<Rin: 'static, Rout: 'static, Win: 'static, Wout: 'static> HandlerInternal
550 for Box<dyn Handler<Rin = Rin, Rout = Rout, Win = Win, Wout = Wout>>
551{
552 fn transport_active_internal(&mut self, ctx: &dyn ContextInternal) {
553 if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
554 self.transport_active(ctx);
555 } else {
556 panic!(
557 "ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
558 ctx.name()
559 );
560 }
561 }
562 fn transport_inactive_internal(&mut self, ctx: &dyn ContextInternal) {
563 if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
564 self.transport_inactive(ctx);
565 } else {
566 panic!(
567 "ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
568 ctx.name()
569 );
570 }
571 }
572
573 fn handle_read_internal(&mut self, ctx: &dyn ContextInternal, msg: Box<dyn Any>) {
574 if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
575 if let Ok(msg) = msg.downcast::<Rin>() {
576 self.handle_read(ctx, *msg);
577 } else {
578 panic!("msg can't downcast::<Rin> in {} handler", ctx.name());
579 }
580 } else {
581 panic!(
582 "ctx can't downcast::<Context<Rin, Rout, Win, Wout>> in {} handler",
583 ctx.name()
584 );
585 }
586 }
587 fn poll_write_internal(&mut self, ctx: &dyn ContextInternal) -> Option<Box<dyn Any>> {
588 if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
589 if let Some(msg) = self.poll_write(ctx) {
590 Some(Box::new(msg))
591 } else {
592 None
593 }
594 } else {
595 panic!(
596 "ctx can't downcast_ref::<OutboundContext<Win, Wout>> in {} handler",
597 ctx.name()
598 );
599 }
600 }
601
602 fn handle_timeout_internal(&mut self, ctx: &dyn ContextInternal, now: Instant) {
603 if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
604 self.handle_timeout(ctx, now);
605 } else {
606 panic!(
607 "ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
608 ctx.name()
609 );
610 }
611 }
612 fn poll_timeout_internal(&mut self, ctx: &dyn ContextInternal, eto: &mut Instant) {
613 if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
614 self.poll_timeout(ctx, eto);
615 } else {
616 panic!(
617 "ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
618 ctx.name()
619 );
620 }
621 }
622
623 fn handle_eof_internal(&mut self, ctx: &dyn ContextInternal) {
624 if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
625 self.handle_eof(ctx);
626 } else {
627 panic!(
628 "ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
629 ctx.name()
630 );
631 }
632 }
633 fn handle_error_internal(&mut self, ctx: &dyn ContextInternal, err: Box<dyn Error>) {
634 if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
635 self.handle_error(ctx, err);
636 } else {
637 panic!(
638 "ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
639 ctx.name()
640 );
641 }
642 }
643 fn handle_close_internal(&mut self, ctx: &dyn ContextInternal) {
644 if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
645 self.handle_close(ctx);
646 } else {
647 panic!(
648 "ctx can't downcast_ref::<OutboundContext<Win, Wout>> in {} handler",
649 ctx.name()
650 );
651 }
652 }
653}
654
655/// Enables a [`Handler`] to interact with the pipeline.
656///
657/// The `Context` object is passed to each handler method and provides the API
658/// for forwarding messages, polling data, and propagating events through the
659/// pipeline.
660///
661/// # Type Parameters
662///
663/// - `Rin`: Read input message type (what the current handler receives)
664/// - `Rout`: Read output message type (what the current handler produces)
665/// - `Win`: Write input message type (what the current handler receives for writing)
666/// - `Wout`: Write output message type (what the current handler produces for writing)
667///
668/// # Responsibilities
669///
670/// The context provides two primary functions:
671///
672/// 1. **Forward Operations**: Push data/events down the pipeline
673/// - `fire_handle_read()`: Forward processed inbound messages
674/// - `fire_transport_active()`: Propagate connection established event
675/// - `fire_handle_timeout()`: Propagate timeout events
676///
677/// 2. **Pull Operations**: Pull data from the next handler
678/// - `fire_poll_write()`: Poll outbound messages from next handler
679/// - `fire_poll_timeout()`: Poll timeout requirements from next handler
680///
681/// # Example
682///
683/// ```rust
684/// use sansio::{Handler, Context};
685///
686/// struct MyHandler;
687///
688/// impl Handler for MyHandler {
689/// type Rin = String;
690/// type Rout = String;
691/// type Win = String;
692/// type Wout = String;
693///
694/// fn name(&self) -> &str {
695/// "MyHandler"
696/// }
697///
698/// fn handle_read(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, msg: Self::Rin) {
699/// // Use context to forward message
700/// ctx.fire_handle_read(msg);
701/// }
702///
703/// fn poll_write(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> {
704/// // Use context to poll next handler
705/// ctx.fire_poll_write()
706/// }
707/// }
708/// ```
709pub struct Context<Rin, Rout, Win, Wout> {
710 name: String,
711
712 next_context: Option<Rc<RefCell<dyn ContextInternal>>>,
713 next_handler: Option<Rc<RefCell<dyn HandlerInternal>>>,
714
715 phantom: PhantomData<(Rin, Rout, Win, Wout)>,
716}
717
718impl<Rin: 'static, Rout: 'static, Win: 'static, Wout: 'static> Context<Rin, Rout, Win, Wout> {
719 /// Creates a new Context.
720 ///
721 /// Typically called internally by the pipeline when adding a handler.
722 /// You rarely need to create a Context manually.
723 ///
724 /// # Parameters
725 ///
726 /// - `name`: The name of the handler this context belongs to
727 pub fn new(name: &str) -> Self {
728 Self {
729 name: name.to_string(),
730
731 next_context: None,
732 next_handler: None,
733
734 phantom: PhantomData,
735 }
736 }
737
738 /// Propagates transport active event to the next handler.
739 ///
740 /// Call this to forward the "connection established" event through the pipeline.
741 /// This should be called from your [`Handler::transport_active`] implementation
742 /// unless you explicitly want to stop event propagation.
743 pub fn fire_transport_active(&self) {
744 if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
745 let (mut next_handler, next_context) =
746 (next_handler.borrow_mut(), next_context.borrow());
747 next_handler.transport_active_internal(&*next_context);
748 }
749 }
750
751 /// Propagates transport inactive event to the next handler.
752 ///
753 /// Call this to forward the "connection closed" event through the pipeline.
754 /// This should be called from your [`Handler::transport_inactive`] implementation
755 /// unless you explicitly want to stop event propagation.
756 pub fn fire_transport_inactive(&self) {
757 if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
758 let (mut next_handler, next_context) =
759 (next_handler.borrow_mut(), next_context.borrow());
760 next_handler.transport_inactive_internal(&*next_context);
761 }
762 }
763
764 /// Forwards an inbound message to the next handler.
765 ///
766 /// After processing a message in [`Handler::handle_read`], call this method
767 /// to pass the transformed message (`Rout`) to the next handler in the pipeline.
768 ///
769 /// # Parameters
770 ///
771 /// - `msg`: The processed message to forward
772 ///
773 /// # Example
774 ///
775 /// ```rust
776 /// # use sansio::{Handler, Context};
777 /// # struct DecoderHandler;
778 /// # impl Handler for DecoderHandler {
779 /// # type Rin = Vec<u8>;
780 /// # type Rout = String;
781 /// # type Win = String;
782 /// # type Wout = Vec<u8>;
783 /// # fn name(&self) -> &str { "DecoderHandler" }
784 /// fn handle_read(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, msg: Self::Rin) {
785 /// // Decode bytes to string
786 /// if let Ok(decoded) = String::from_utf8(msg) {
787 /// // Forward decoded message
788 /// ctx.fire_handle_read(decoded);
789 /// }
790 /// }
791 /// # fn poll_write(&mut self, _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> { None }
792 /// # }
793 /// ```
794 pub fn fire_handle_read(&self, msg: Rout) {
795 if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
796 let (mut next_handler, next_context) =
797 (next_handler.borrow_mut(), next_context.borrow());
798 next_handler.handle_read_internal(&*next_context, Box::new(msg));
799 } else {
800 warn!("handle_read reached end of pipeline");
801 }
802 }
803
804 /// Polls the next handler for an outbound message.
805 ///
806 /// Call this from [`Handler::poll_write`] to retrieve messages from the next
807 /// handler in the pipeline. Returns `Some(Win)` if the next handler has data
808 /// to send, or `None` otherwise.
809 ///
810 /// # Returns
811 ///
812 /// - `Some(Win)`: A message from the next handler
813 /// - `None`: No message available
814 ///
815 /// # Example
816 ///
817 /// ```rust
818 /// # use sansio::{Handler, Context};
819 /// # use std::collections::VecDeque;
820 /// # struct EncoderHandler { queue: VecDeque<Vec<u8>> }
821 /// # impl Handler for EncoderHandler {
822 /// # type Rin = String;
823 /// # type Rout = String;
824 /// # type Win = String;
825 /// # type Wout = Vec<u8>;
826 /// # fn name(&self) -> &str { "EncoderHandler" }
827 /// fn poll_write(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> {
828 /// // Poll next handler first
829 /// if let Some(msg) = ctx.fire_poll_write() {
830 /// // Encode string to bytes
831 /// return Some(msg.into_bytes());
832 /// }
833 /// // Then return our own queued data
834 /// self.queue.pop_front()
835 /// }
836 /// # fn handle_read(&mut self, _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, _msg: Self::Rin) {}
837 /// # }
838 /// ```
839 pub fn fire_poll_write(&self) -> Option<Win> {
840 if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
841 let (mut next_handler, next_context) =
842 (next_handler.borrow_mut(), next_context.borrow());
843 if let Some(msg) = next_handler.poll_write_internal(&*next_context) {
844 if let Ok(msg) = msg.downcast::<Win>() {
845 Some(*msg)
846 } else {
847 panic!(
848 "msg can't downcast::<Win> in {} handler",
849 next_context.name()
850 );
851 }
852 } else {
853 None
854 }
855 } else {
856 warn!("poll_write reached end of pipeline");
857 None
858 }
859 }
860
861 /// Propagates timeout event to the next handler.
862 ///
863 /// Call this from [`Handler::handle_timeout`] to forward the timeout event
864 /// through the pipeline.
865 ///
866 /// # Parameters
867 ///
868 /// - `now`: The current timestamp
869 pub fn fire_handle_timeout(&self, now: Instant) {
870 if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
871 let (mut next_handler, next_context) =
872 (next_handler.borrow_mut(), next_context.borrow());
873 next_handler.handle_timeout_internal(&*next_context, now);
874 } else {
875 warn!("handle_timeout reached end of pipeline");
876 }
877 }
878
879 /// Polls the next handler for its timeout deadline.
880 ///
881 /// Call this from [`Handler::poll_timeout`] to allow the next handler to
882 /// update the earliest timeout deadline.
883 ///
884 /// # Parameters
885 ///
886 /// - `eto`: Mutable reference to the earliest timeout. The next handler may update this.
887 pub fn fire_poll_timeout(&self, eto: &mut Instant) {
888 if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
889 let (mut next_handler, next_context) =
890 (next_handler.borrow_mut(), next_context.borrow());
891 next_handler.poll_timeout_internal(&*next_context, eto);
892 } else {
893 trace!("poll_timeout reached end of pipeline");
894 }
895 }
896
897 /// Propagates EOF event to the next handler.
898 ///
899 /// Call this from [`Handler::handle_eof`] to forward the end-of-file event
900 /// through the pipeline.
901 pub fn fire_handle_eof(&self) {
902 if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
903 let (mut next_handler, next_context) =
904 (next_handler.borrow_mut(), next_context.borrow());
905 next_handler.handle_eof_internal(&*next_context);
906 } else {
907 warn!("handle_eof reached end of pipeline");
908 }
909 }
910
911 /// Propagates error event to the next handler.
912 ///
913 /// Call this from [`Handler::handle_error`] to forward the error through
914 /// the pipeline.
915 ///
916 /// # Parameters
917 ///
918 /// - `err`: The error to propagate
919 pub fn fire_handle_error(&self, err: Box<dyn Error>) {
920 if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
921 let (mut next_handler, next_context) =
922 (next_handler.borrow_mut(), next_context.borrow());
923 next_handler.handle_error_internal(&*next_context, err);
924 } else {
925 warn!("handle_error reached end of pipeline");
926 }
927 }
928
929 /// Propagates close event to the next handler.
930 ///
931 /// Call this from [`Handler::handle_close`] to forward the close request
932 /// through the pipeline.
933 pub fn fire_handle_close(&self) {
934 if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
935 let (mut next_handler, next_context) =
936 (next_handler.borrow_mut(), next_context.borrow());
937 next_handler.handle_close_internal(&*next_context);
938 } else {
939 warn!("handle_close reached end of pipeline");
940 }
941 }
942}
943
944impl<Rin: 'static, Rout: 'static, Win: 'static, Wout: 'static> ContextInternal
945 for Context<Rin, Rout, Win, Wout>
946{
947 fn fire_transport_active_internal(&self) {
948 self.fire_transport_active();
949 }
950 fn fire_transport_inactive_internal(&self) {
951 self.fire_transport_inactive();
952 }
953 fn fire_handle_read_internal(&self, msg: Box<dyn Any>) {
954 if let Ok(msg) = msg.downcast::<Rout>() {
955 self.fire_handle_read(*msg);
956 } else {
957 panic!("msg can't downcast::<Rout> in {} handler", self.name());
958 }
959 }
960 fn fire_poll_write_internal(&self) -> Option<Box<dyn Any>> {
961 if let Some(msg) = self.fire_poll_write() {
962 Some(Box::new(msg))
963 } else {
964 None
965 }
966 }
967
968 fn fire_handle_timeout_internal(&self, now: Instant) {
969 self.fire_handle_timeout(now);
970 }
971 fn fire_poll_timeout_internal(&self, eto: &mut Instant) {
972 self.fire_poll_timeout(eto);
973 }
974
975 fn fire_handle_eof_internal(&self) {
976 self.fire_handle_eof();
977 }
978 fn fire_handle_error_internal(&self, err: Box<dyn Error>) {
979 self.fire_handle_error(err);
980 }
981 fn fire_handle_close_internal(&self) {
982 self.fire_handle_close();
983 }
984
985 fn name(&self) -> &str {
986 self.name.as_str()
987 }
988 fn as_any(&self) -> &dyn Any {
989 self
990 }
991 fn set_next_context(&mut self, next_context: Option<Rc<RefCell<dyn ContextInternal>>>) {
992 self.next_context = next_context;
993 }
994 fn set_next_handler(&mut self, next_handler: Option<Rc<RefCell<dyn HandlerInternal>>>) {
995 self.next_handler = next_handler;
996 }
997}