sansio/
handler.rs

1//! # Handler Trait - Pipeline-Based Protocol Processing
2//!
3//! The [`Handler`] trait is the core building block for constructing protocol pipelines in sansio.
4//! Handlers process messages flowing through a pipeline, transforming them from one type to another
5//! while maintaining strict type safety.
6//!
7//! ## Overview
8//!
9//! A handler is a bidirectional message processor that sits in a pipeline:
10//! - **Inbound direction**: Receives `Rin`, processes it, produces `Rout` for the next handler
11//! - **Outbound direction**: Receives `Win`, processes it, produces `Wout` for the previous handler
12//!
13//! Each handler has a [`Context`] that allows it to:
14//! - Forward messages to the next handler in the pipeline
15//! - Poll messages from the next handler
16//! - Propagate events (timeouts, errors, EOF, etc.)
17//!
18//! ## Type Parameters
19//!
20//! Handlers have four associated types that define the transformation:
21//!
22//! - `Rin`: **R**ead **in**put - What this handler receives from the previous handler (inbound)
23//! - `Rout`: **R**ead **out**put - What this handler produces for the next handler (inbound)
24//! - `Win`: **W**rite **in**put - What this handler receives from the next handler (outbound)
25//! - `Wout`: **W**rite **out**put - What this handler produces for the previous handler (outbound)
26//!
27//! ## Handler Chain Type Flow
28//!
29//! ```text
30//! Pipeline: Handler1 -> Handler2 -> Handler3
31//!
32//! Inbound (handle_read):
33//!   Handler1: Rin1 -> Rout1
34//!   Handler2: Rin2 (= Rout1) -> Rout2
35//!   Handler3: Rin3 (= Rout2) -> Rout3
36//!
37//! Outbound (poll_write):
38//!   Handler3: Win3 -> Wout3
39//!   Handler2: Win2 (= Wout3) -> Wout2
40//!   Handler1: Win1 (= Wout2) -> Wout1
41//! ```
42//!
43//! ## Example: Echo Handler
44//!
45//! ```rust
46//! use sansio::{Handler, Context};
47//! use std::collections::VecDeque;
48//!
49//! /// Echo handler that receives strings and queues them to be sent back
50//! struct EchoHandler {
51//!     output_queue: VecDeque<String>,
52//! }
53//!
54//! impl EchoHandler {
55//!     fn new() -> Self {
56//!         Self {
57//!             output_queue: VecDeque::new(),
58//!         }
59//!     }
60//! }
61//!
62//! impl Handler for EchoHandler {
63//!     // Input type: String
64//!     type Rin = String;
65//!     // Output type: Same as input (passthrough for inbound)
66//!     type Rout = String;
67//!     // Write input type: String
68//!     type Win = String;
69//!     // Write output type: String to send back
70//!     type Wout = String;
71//!
72//!     fn name(&self) -> &str {
73//!         "EchoHandler"
74//!     }
75//!
76//!     fn handle_read(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, msg: Self::Rin) {
77//!         println!("Echoing: {}", msg);
78//!         // Queue the message to be sent back
79//!         self.output_queue.push_back(msg.clone());
80//!         // Also forward to next handler (if any)
81//!         ctx.fire_handle_read(msg);
82//!     }
83//!
84//!     fn poll_write(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> {
85//!         // First check if next handler has data
86//!         if let Some(msg) = ctx.fire_poll_write() {
87//!             return Some(msg);
88//!         }
89//!         // Then return our queued data
90//!         self.output_queue.pop_front()
91//!     }
92//! }
93//! ```
94//!
95//! ## Example: Protocol Codec Handler
96//!
97//! ```rust
98//! use sansio::{Handler, Context};
99//! use std::collections::VecDeque;
100//!
101//! # #[derive(Debug, Clone)]
102//! # struct Message { data: String }
103//!
104//! /// Decodes bytes into messages, encodes messages into bytes
105//! struct ProtocolCodec {
106//!     read_buffer: Vec<u8>,
107//!     write_queue: VecDeque<Vec<u8>>,
108//! }
109//!
110//! impl Handler for ProtocolCodec {
111//!     // Inbound: receive bytes, produce Messages
112//!     type Rin = Vec<u8>;
113//!     type Rout = Message;
114//!     // Outbound: receive Messages, produce bytes
115//!     type Win = Message;
116//!     type Wout = Vec<u8>;
117//!
118//!     fn name(&self) -> &str {
119//!         "ProtocolCodec"
120//!     }
121//!
122//!     fn handle_read(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, msg: Self::Rin) {
123//!         // Decode bytes into message
124//!         self.read_buffer.extend_from_slice(&msg);
125//!
126//!         // Simple example: assume we have a complete message
127//!         if let Ok(s) = String::from_utf8(self.read_buffer.clone()) {
128//!             let message = Message { data: s };
129//!             self.read_buffer.clear();
130//!             // Forward decoded message to next handler
131//!             ctx.fire_handle_read(message);
132//!         }
133//!     }
134//!
135//!     fn poll_write(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> {
136//!         // Poll for messages from next handler
137//!         if let Some(msg) = ctx.fire_poll_write() {
138//!             // Encode message to bytes
139//!             self.write_queue.push_back(msg.data.into_bytes());
140//!         }
141//!         // Return encoded bytes
142//!         self.write_queue.pop_front()
143//!     }
144//! }
145//! ```
146//!
147//! ## Event Handling
148//!
149//! Handlers can also process lifecycle and error events:
150//!
151//! - **`transport_active`**: Connection established
152//! - **`transport_inactive`**: Connection closed
153//! - **`handle_timeout`**: Periodic timeout for time-based operations
154//! - **`handle_error`**: Error occurred in the pipeline
155//! - **`handle_eof`**: End-of-file/stream reached
156//! - **`handle_close`**: Explicit close request
157//!
158//! ## Context API
159//!
160//! The [`Context`] object provides methods to interact with the pipeline:
161//!
162//! - **`fire_handle_read(msg)`**: Forward a processed inbound message
163//! - **`fire_poll_write()`**: Poll for outbound messages from next handler
164//! - **`fire_transport_active()`**: Propagate transport active event
165//! - **`fire_handle_timeout(now)`**: Propagate timeout event
166//! - **`fire_handle_error(err)`**: Propagate error event
167//!
168//! ## Best Practices
169//!
170//! 1. **Single Responsibility**: Each handler should do one thing well
171//! 2. **Type Transformation**: Use type parameters to document transformations
172//! 3. **Event Propagation**: Always call `ctx.fire_*` to propagate events unless you explicitly want to stop them
173//! 4. **Error Handling**: Convert errors into appropriate types or propagate via `fire_handle_error`
174//! 5. **Buffering**: Use internal queues when transformations are one-to-many or many-to-one
175//!
176//! ## Comparison with Protocol Trait
177//!
178//! | Feature | Handler | Protocol |
179//! |---------|---------|----------|
180//! | Complexity | More complex | Simple |
181//! | Context | Has context object | No context |
182//! | Composition | Pipeline-based | Manual |
183//! | Best for | Protocol stacks | Single protocols |
184//!
185//! Use `Handler` when building complex protocol pipelines with multiple layers.
186//! Use [`Protocol`](crate::Protocol) when you want a simple, self-contained protocol.
187
188use crate::handler_internal::{ContextInternal, HandlerInternal};
189use log::{trace, warn};
190use std::any::Any;
191use std::cell::RefCell;
192use std::marker::PhantomData;
193use std::rc::Rc;
194use std::{error::Error, time::Instant};
195
196/// A handler processes messages flowing through a pipeline.
197///
198/// The `Handler` trait defines bidirectional message processing with strict type safety.
199/// Each handler transforms messages from one type to another, enabling complex protocol
200/// stacks to be built from simple, composable components.
201///
202/// # Type Parameters
203///
204/// - `Rin`: Read input - what this handler receives (inbound direction)
205/// - `Rout`: Read output - what this handler produces (inbound direction)
206/// - `Win`: Write input - what this handler receives (outbound direction)
207/// - `Wout`: Write output - what this handler produces (outbound direction)
208///
209/// # Design Pattern
210///
211/// Handlers follow the Intercepting Filter pattern:
212/// 1. Receive messages from one direction
213/// 2. Transform the message (decode, encode, validate, etc.)
214/// 3. Forward to the next handler in the pipeline
215/// 4. Optionally handle events (timeouts, errors, lifecycle)
216///
217/// # Example
218///
219/// See the [module-level documentation](index.html) for complete examples.
220pub trait Handler {
221    /// Read input message type.
222    ///
223    /// This is the type of messages this handler receives from the previous handler
224    /// in the pipeline (inbound direction).
225    type Rin: 'static;
226
227    /// Read output message type.
228    ///
229    /// This is the type of messages this handler produces for the next handler
230    /// in the pipeline (inbound direction).
231    type Rout: 'static;
232
233    /// Write input message type.
234    ///
235    /// This is the type of messages this handler receives from the next handler
236    /// in the pipeline (outbound direction).
237    type Win: 'static;
238
239    /// Write output message type.
240    ///
241    /// This is the type of messages this handler produces for the previous handler
242    /// in the pipeline (outbound direction).
243    type Wout: 'static;
244
245    /// Returns the handler's name.
246    ///
247    /// Used for debugging, logging, and error messages. Should be unique
248    /// within a pipeline to help identify which handler is being referenced.
249    ///
250    /// # Example
251    ///
252    /// ```rust
253    /// # use sansio::Handler;
254    /// # struct MyHandler;
255    /// # impl Handler for MyHandler {
256    /// #     type Rin = String;
257    /// #     type Rout = String;
258    /// #     type Win = String;
259    /// #     type Wout = String;
260    /// fn name(&self) -> &str {
261    ///     "MyProtocolHandler"
262    /// }
263    /// #     fn handle_read(&mut self, _ctx: &sansio::Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, _msg: Self::Rin) {}
264    /// #     fn poll_write(&mut self, _ctx: &sansio::Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> { None }
265    /// # }
266    /// ```
267    fn name(&self) -> &str;
268
269    #[doc(hidden)]
270    #[allow(clippy::type_complexity)]
271    fn generate(
272        self,
273    ) -> (
274        String,
275        Rc<RefCell<dyn HandlerInternal>>,
276        Rc<RefCell<dyn ContextInternal>>,
277    )
278    where
279        Self: Sized + 'static,
280    {
281        let handler_name = self.name().to_owned();
282        let context: Context<Self::Rin, Self::Rout, Self::Win, Self::Wout> =
283            Context::new(self.name());
284
285        let handler: Box<
286            dyn Handler<Rin = Self::Rin, Rout = Self::Rout, Win = Self::Win, Wout = Self::Wout>,
287        > = Box::new(self);
288
289        (
290            handler_name,
291            Rc::new(RefCell::new(handler)),
292            Rc::new(RefCell::new(context)),
293        )
294    }
295
296    /// Called when the transport becomes active (connected).
297    ///
298    /// This event is triggered when a connection is established. Handlers can use this
299    /// to initialize state, start timers, or perform handshakes.
300    ///
301    /// The default implementation propagates the event to the next handler.
302    ///
303    /// # Parameters
304    ///
305    /// - `ctx`: Context for interacting with the pipeline
306    ///
307    /// # Example
308    ///
309    /// ```rust
310    /// # use sansio::{Handler, Context};
311    /// # struct MyHandler { connected: bool }
312    /// # impl Handler for MyHandler {
313    /// #     type Rin = String;
314    /// #     type Rout = String;
315    /// #     type Win = String;
316    /// #     type Wout = String;
317    /// #     fn name(&self) -> &str { "MyHandler" }
318    /// fn transport_active(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
319    ///     self.connected = true;
320    ///     println!("Connection established!");
321    ///     // Propagate to next handler
322    ///     ctx.fire_transport_active();
323    /// }
324    /// #     fn handle_read(&mut self, _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, _msg: Self::Rin) {}
325    /// #     fn poll_write(&mut self, _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> { None }
326    /// # }
327    /// ```
328    fn transport_active(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
329        ctx.fire_transport_active();
330    }
331
332    /// Called when the transport becomes inactive (disconnected).
333    ///
334    /// This event is triggered when a connection is closed. Handlers can use this
335    /// to clean up resources, cancel timers, or save state.
336    ///
337    /// The default implementation propagates the event to the next handler.
338    ///
339    /// # Parameters
340    ///
341    /// - `ctx`: Context for interacting with the pipeline
342    fn transport_inactive(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
343        ctx.fire_transport_inactive();
344    }
345
346    /// Handles an inbound message.
347    ///
348    /// This is the primary method for processing incoming data. The handler receives
349    /// a message of type `Rin`, processes it, and can forward transformed messages
350    /// of type `Rout` to the next handler via `ctx.fire_handle_read(msg)`.
351    ///
352    /// # Parameters
353    ///
354    /// - `ctx`: Context for interacting with the pipeline
355    /// - `msg`: The incoming message to process
356    ///
357    /// # Example
358    ///
359    /// ```rust
360    /// # use sansio::{Handler, Context};
361    /// # struct ToUpperHandler;
362    /// # impl Handler for ToUpperHandler {
363    /// #     type Rin = String;
364    /// #     type Rout = String;
365    /// #     type Win = String;
366    /// #     type Wout = String;
367    /// #     fn name(&self) -> &str { "ToUpperHandler" }
368    /// fn handle_read(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, msg: Self::Rin) {
369    ///     // Transform the message
370    ///     let transformed = msg.to_uppercase();
371    ///     // Forward to next handler
372    ///     ctx.fire_handle_read(transformed);
373    /// }
374    /// #     fn poll_write(&mut self, _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> { None }
375    /// # }
376    /// ```
377    fn handle_read(
378        &mut self,
379        ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
380        msg: Self::Rin,
381    );
382
383    /// Polls for an outbound message.
384    ///
385    /// This method is called to retrieve messages that should be written to the transport.
386    /// Handlers typically:
387    /// 1. Poll the next handler via `ctx.fire_poll_write()`
388    /// 2. Transform received messages or generate their own
389    /// 3. Return messages of type `Wout`
390    ///
391    /// Returns `None` when no message is available.
392    ///
393    /// # Parameters
394    ///
395    /// - `ctx`: Context for interacting with the pipeline
396    ///
397    /// # Returns
398    ///
399    /// - `Some(Wout)`: A message ready to be written
400    /// - `None`: No message available
401    ///
402    /// # Example
403    ///
404    /// ```rust
405    /// # use sansio::{Handler, Context};
406    /// # use std::collections::VecDeque;
407    /// # struct BufferingHandler { queue: VecDeque<String> }
408    /// # impl Handler for BufferingHandler {
409    /// #     type Rin = String;
410    /// #     type Rout = String;
411    /// #     type Win = String;
412    /// #     type Wout = String;
413    /// #     fn name(&self) -> &str { "BufferingHandler" }
414    /// fn poll_write(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> {
415    ///     // First check next handler
416    ///     if let Some(msg) = ctx.fire_poll_write() {
417    ///         return Some(msg);
418    ///     }
419    ///     // Then check our own queue
420    ///     self.queue.pop_front()
421    /// }
422    /// #     fn handle_read(&mut self, _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, _msg: Self::Rin) {}
423    /// # }
424    /// ```
425    fn poll_write(
426        &mut self,
427        ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
428    ) -> Option<Self::Wout>;
429
430    /// Handles a timeout event.
431    ///
432    /// Called periodically to allow handlers to perform time-based operations like:
433    /// - Sending keepalive messages
434    /// - Timing out pending operations
435    /// - Retransmitting lost packets
436    ///
437    /// The default implementation propagates the event to the next handler.
438    ///
439    /// # Parameters
440    ///
441    /// - `ctx`: Context for interacting with the pipeline
442    /// - `now`: The current timestamp
443    fn handle_timeout(
444        &mut self,
445        ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
446        now: Instant,
447    ) {
448        ctx.fire_handle_timeout(now);
449    }
450
451    /// Polls for the next timeout deadline.
452    ///
453    /// Handlers can update `eto` (earliest timeout) to indicate when they next
454    /// need `handle_timeout` to be called. The pipeline will call the handler
455    /// with the minimum timeout across all handlers.
456    ///
457    /// The default implementation propagates the poll to the next handler.
458    ///
459    /// # Parameters
460    ///
461    /// - `ctx`: Context for interacting with the pipeline
462    /// - `eto`: Mutable reference to the earliest timeout. Update this to request an earlier timeout.
463    ///
464    /// # Example
465    ///
466    /// ```rust
467    /// # use sansio::{Handler, Context};
468    /// # use std::time::{Instant, Duration};
469    /// # struct KeepaliveHandler { next_keepalive: Option<Instant> }
470    /// # impl Handler for KeepaliveHandler {
471    /// #     type Rin = String;
472    /// #     type Rout = String;
473    /// #     type Win = String;
474    /// #     type Wout = String;
475    /// #     fn name(&self) -> &str { "KeepaliveHandler" }
476    /// fn poll_timeout(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, eto: &mut Instant) {
477    ///     // Request timeout for our keepalive
478    ///     if let Some(deadline) = self.next_keepalive {
479    ///         if deadline < *eto {
480    ///             *eto = deadline;
481    ///         }
482    ///     }
483    ///     // Also check next handler
484    ///     ctx.fire_poll_timeout(eto);
485    /// }
486    /// #     fn handle_read(&mut self, _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, _msg: Self::Rin) {}
487    /// #     fn poll_write(&mut self, _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> { None }
488    /// # }
489    /// ```
490    fn poll_timeout(
491        &mut self,
492        ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
493        eto: &mut Instant,
494    ) {
495        ctx.fire_poll_timeout(eto);
496    }
497
498    /// Handles an end-of-file (EOF) event.
499    ///
500    /// Called when the input stream reaches EOF (e.g., remote peer closed their write side).
501    /// Handlers can use this to flush buffers or initiate graceful shutdown.
502    ///
503    /// The default implementation propagates the event to the next handler.
504    ///
505    /// # Parameters
506    ///
507    /// - `ctx`: Context for interacting with the pipeline
508    fn handle_eof(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
509        ctx.fire_handle_eof();
510    }
511
512    /// Handles an error event.
513    ///
514    /// Called when an error occurs in the pipeline. Handlers can:
515    /// - Log the error
516    /// - Attempt recovery
517    /// - Transform the error
518    /// - Propagate it to the next handler
519    ///
520    /// The default implementation propagates the error to the next handler.
521    ///
522    /// # Parameters
523    ///
524    /// - `ctx`: Context for interacting with the pipeline
525    /// - `err`: The error that occurred
526    fn handle_error(
527        &mut self,
528        ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
529        err: Box<dyn Error>,
530    ) {
531        ctx.fire_handle_error(err);
532    }
533
534    /// Handles a close event.
535    ///
536    /// Called when the pipeline is being closed. Handlers should clean up resources,
537    /// flush buffers, and prepare for shutdown.
538    ///
539    /// The default implementation propagates the event to the next handler.
540    ///
541    /// # Parameters
542    ///
543    /// - `ctx`: Context for interacting with the pipeline
544    fn handle_close(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
545        ctx.fire_handle_close();
546    }
547}
548
549impl<Rin: 'static, Rout: 'static, Win: 'static, Wout: 'static> HandlerInternal
550    for Box<dyn Handler<Rin = Rin, Rout = Rout, Win = Win, Wout = Wout>>
551{
552    fn transport_active_internal(&mut self, ctx: &dyn ContextInternal) {
553        if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
554            self.transport_active(ctx);
555        } else {
556            panic!(
557                "ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
558                ctx.name()
559            );
560        }
561    }
562    fn transport_inactive_internal(&mut self, ctx: &dyn ContextInternal) {
563        if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
564            self.transport_inactive(ctx);
565        } else {
566            panic!(
567                "ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
568                ctx.name()
569            );
570        }
571    }
572
573    fn handle_read_internal(&mut self, ctx: &dyn ContextInternal, msg: Box<dyn Any>) {
574        if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
575            if let Ok(msg) = msg.downcast::<Rin>() {
576                self.handle_read(ctx, *msg);
577            } else {
578                panic!("msg can't downcast::<Rin> in {} handler", ctx.name());
579            }
580        } else {
581            panic!(
582                "ctx can't downcast::<Context<Rin, Rout, Win, Wout>> in {} handler",
583                ctx.name()
584            );
585        }
586    }
587    fn poll_write_internal(&mut self, ctx: &dyn ContextInternal) -> Option<Box<dyn Any>> {
588        if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
589            if let Some(msg) = self.poll_write(ctx) {
590                Some(Box::new(msg))
591            } else {
592                None
593            }
594        } else {
595            panic!(
596                "ctx can't downcast_ref::<OutboundContext<Win, Wout>> in {} handler",
597                ctx.name()
598            );
599        }
600    }
601
602    fn handle_timeout_internal(&mut self, ctx: &dyn ContextInternal, now: Instant) {
603        if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
604            self.handle_timeout(ctx, now);
605        } else {
606            panic!(
607                "ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
608                ctx.name()
609            );
610        }
611    }
612    fn poll_timeout_internal(&mut self, ctx: &dyn ContextInternal, eto: &mut Instant) {
613        if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
614            self.poll_timeout(ctx, eto);
615        } else {
616            panic!(
617                "ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
618                ctx.name()
619            );
620        }
621    }
622
623    fn handle_eof_internal(&mut self, ctx: &dyn ContextInternal) {
624        if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
625            self.handle_eof(ctx);
626        } else {
627            panic!(
628                "ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
629                ctx.name()
630            );
631        }
632    }
633    fn handle_error_internal(&mut self, ctx: &dyn ContextInternal, err: Box<dyn Error>) {
634        if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
635            self.handle_error(ctx, err);
636        } else {
637            panic!(
638                "ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
639                ctx.name()
640            );
641        }
642    }
643    fn handle_close_internal(&mut self, ctx: &dyn ContextInternal) {
644        if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
645            self.handle_close(ctx);
646        } else {
647            panic!(
648                "ctx can't downcast_ref::<OutboundContext<Win, Wout>> in {} handler",
649                ctx.name()
650            );
651        }
652    }
653}
654
655/// Enables a [`Handler`] to interact with the pipeline.
656///
657/// The `Context` object is passed to each handler method and provides the API
658/// for forwarding messages, polling data, and propagating events through the
659/// pipeline.
660///
661/// # Type Parameters
662///
663/// - `Rin`: Read input message type (what the current handler receives)
664/// - `Rout`: Read output message type (what the current handler produces)
665/// - `Win`: Write input message type (what the current handler receives for writing)
666/// - `Wout`: Write output message type (what the current handler produces for writing)
667///
668/// # Responsibilities
669///
670/// The context provides two primary functions:
671///
672/// 1. **Forward Operations**: Push data/events down the pipeline
673///    - `fire_handle_read()`: Forward processed inbound messages
674///    - `fire_transport_active()`: Propagate connection established event
675///    - `fire_handle_timeout()`: Propagate timeout events
676///
677/// 2. **Pull Operations**: Pull data from the next handler
678///    - `fire_poll_write()`: Poll outbound messages from next handler
679///    - `fire_poll_timeout()`: Poll timeout requirements from next handler
680///
681/// # Example
682///
683/// ```rust
684/// use sansio::{Handler, Context};
685///
686/// struct MyHandler;
687///
688/// impl Handler for MyHandler {
689///     type Rin = String;
690///     type Rout = String;
691///     type Win = String;
692///     type Wout = String;
693///
694///     fn name(&self) -> &str {
695///         "MyHandler"
696///     }
697///
698///     fn handle_read(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, msg: Self::Rin) {
699///         // Use context to forward message
700///         ctx.fire_handle_read(msg);
701///     }
702///
703///     fn poll_write(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> {
704///         // Use context to poll next handler
705///         ctx.fire_poll_write()
706///     }
707/// }
708/// ```
709pub struct Context<Rin, Rout, Win, Wout> {
710    name: String,
711
712    next_context: Option<Rc<RefCell<dyn ContextInternal>>>,
713    next_handler: Option<Rc<RefCell<dyn HandlerInternal>>>,
714
715    phantom: PhantomData<(Rin, Rout, Win, Wout)>,
716}
717
718impl<Rin: 'static, Rout: 'static, Win: 'static, Wout: 'static> Context<Rin, Rout, Win, Wout> {
719    /// Creates a new Context.
720    ///
721    /// Typically called internally by the pipeline when adding a handler.
722    /// You rarely need to create a Context manually.
723    ///
724    /// # Parameters
725    ///
726    /// - `name`: The name of the handler this context belongs to
727    pub fn new(name: &str) -> Self {
728        Self {
729            name: name.to_string(),
730
731            next_context: None,
732            next_handler: None,
733
734            phantom: PhantomData,
735        }
736    }
737
738    /// Propagates transport active event to the next handler.
739    ///
740    /// Call this to forward the "connection established" event through the pipeline.
741    /// This should be called from your [`Handler::transport_active`] implementation
742    /// unless you explicitly want to stop event propagation.
743    pub fn fire_transport_active(&self) {
744        if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
745            let (mut next_handler, next_context) =
746                (next_handler.borrow_mut(), next_context.borrow());
747            next_handler.transport_active_internal(&*next_context);
748        }
749    }
750
751    /// Propagates transport inactive event to the next handler.
752    ///
753    /// Call this to forward the "connection closed" event through the pipeline.
754    /// This should be called from your [`Handler::transport_inactive`] implementation
755    /// unless you explicitly want to stop event propagation.
756    pub fn fire_transport_inactive(&self) {
757        if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
758            let (mut next_handler, next_context) =
759                (next_handler.borrow_mut(), next_context.borrow());
760            next_handler.transport_inactive_internal(&*next_context);
761        }
762    }
763
764    /// Forwards an inbound message to the next handler.
765    ///
766    /// After processing a message in [`Handler::handle_read`], call this method
767    /// to pass the transformed message (`Rout`) to the next handler in the pipeline.
768    ///
769    /// # Parameters
770    ///
771    /// - `msg`: The processed message to forward
772    ///
773    /// # Example
774    ///
775    /// ```rust
776    /// # use sansio::{Handler, Context};
777    /// # struct DecoderHandler;
778    /// # impl Handler for DecoderHandler {
779    /// #     type Rin = Vec<u8>;
780    /// #     type Rout = String;
781    /// #     type Win = String;
782    /// #     type Wout = Vec<u8>;
783    /// #     fn name(&self) -> &str { "DecoderHandler" }
784    /// fn handle_read(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, msg: Self::Rin) {
785    ///     // Decode bytes to string
786    ///     if let Ok(decoded) = String::from_utf8(msg) {
787    ///         // Forward decoded message
788    ///         ctx.fire_handle_read(decoded);
789    ///     }
790    /// }
791    /// #     fn poll_write(&mut self, _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> { None }
792    /// # }
793    /// ```
794    pub fn fire_handle_read(&self, msg: Rout) {
795        if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
796            let (mut next_handler, next_context) =
797                (next_handler.borrow_mut(), next_context.borrow());
798            next_handler.handle_read_internal(&*next_context, Box::new(msg));
799        } else {
800            warn!("handle_read reached end of pipeline");
801        }
802    }
803
804    /// Polls the next handler for an outbound message.
805    ///
806    /// Call this from [`Handler::poll_write`] to retrieve messages from the next
807    /// handler in the pipeline. Returns `Some(Win)` if the next handler has data
808    /// to send, or `None` otherwise.
809    ///
810    /// # Returns
811    ///
812    /// - `Some(Win)`: A message from the next handler
813    /// - `None`: No message available
814    ///
815    /// # Example
816    ///
817    /// ```rust
818    /// # use sansio::{Handler, Context};
819    /// # use std::collections::VecDeque;
820    /// # struct EncoderHandler { queue: VecDeque<Vec<u8>> }
821    /// # impl Handler for EncoderHandler {
822    /// #     type Rin = String;
823    /// #     type Rout = String;
824    /// #     type Win = String;
825    /// #     type Wout = Vec<u8>;
826    /// #     fn name(&self) -> &str { "EncoderHandler" }
827    /// fn poll_write(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) -> Option<Self::Wout> {
828    ///     // Poll next handler first
829    ///     if let Some(msg) = ctx.fire_poll_write() {
830    ///         // Encode string to bytes
831    ///         return Some(msg.into_bytes());
832    ///     }
833    ///     // Then return our own queued data
834    ///     self.queue.pop_front()
835    /// }
836    /// #     fn handle_read(&mut self, _ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>, _msg: Self::Rin) {}
837    /// # }
838    /// ```
839    pub fn fire_poll_write(&self) -> Option<Win> {
840        if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
841            let (mut next_handler, next_context) =
842                (next_handler.borrow_mut(), next_context.borrow());
843            if let Some(msg) = next_handler.poll_write_internal(&*next_context) {
844                if let Ok(msg) = msg.downcast::<Win>() {
845                    Some(*msg)
846                } else {
847                    panic!(
848                        "msg can't downcast::<Win> in {} handler",
849                        next_context.name()
850                    );
851                }
852            } else {
853                None
854            }
855        } else {
856            warn!("poll_write reached end of pipeline");
857            None
858        }
859    }
860
861    /// Propagates timeout event to the next handler.
862    ///
863    /// Call this from [`Handler::handle_timeout`] to forward the timeout event
864    /// through the pipeline.
865    ///
866    /// # Parameters
867    ///
868    /// - `now`: The current timestamp
869    pub fn fire_handle_timeout(&self, now: Instant) {
870        if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
871            let (mut next_handler, next_context) =
872                (next_handler.borrow_mut(), next_context.borrow());
873            next_handler.handle_timeout_internal(&*next_context, now);
874        } else {
875            warn!("handle_timeout reached end of pipeline");
876        }
877    }
878
879    /// Polls the next handler for its timeout deadline.
880    ///
881    /// Call this from [`Handler::poll_timeout`] to allow the next handler to
882    /// update the earliest timeout deadline.
883    ///
884    /// # Parameters
885    ///
886    /// - `eto`: Mutable reference to the earliest timeout. The next handler may update this.
887    pub fn fire_poll_timeout(&self, eto: &mut Instant) {
888        if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
889            let (mut next_handler, next_context) =
890                (next_handler.borrow_mut(), next_context.borrow());
891            next_handler.poll_timeout_internal(&*next_context, eto);
892        } else {
893            trace!("poll_timeout reached end of pipeline");
894        }
895    }
896
897    /// Propagates EOF event to the next handler.
898    ///
899    /// Call this from [`Handler::handle_eof`] to forward the end-of-file event
900    /// through the pipeline.
901    pub fn fire_handle_eof(&self) {
902        if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
903            let (mut next_handler, next_context) =
904                (next_handler.borrow_mut(), next_context.borrow());
905            next_handler.handle_eof_internal(&*next_context);
906        } else {
907            warn!("handle_eof reached end of pipeline");
908        }
909    }
910
911    /// Propagates error event to the next handler.
912    ///
913    /// Call this from [`Handler::handle_error`] to forward the error through
914    /// the pipeline.
915    ///
916    /// # Parameters
917    ///
918    /// - `err`: The error to propagate
919    pub fn fire_handle_error(&self, err: Box<dyn Error>) {
920        if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
921            let (mut next_handler, next_context) =
922                (next_handler.borrow_mut(), next_context.borrow());
923            next_handler.handle_error_internal(&*next_context, err);
924        } else {
925            warn!("handle_error reached end of pipeline");
926        }
927    }
928
929    /// Propagates close event to the next handler.
930    ///
931    /// Call this from [`Handler::handle_close`] to forward the close request
932    /// through the pipeline.
933    pub fn fire_handle_close(&self) {
934        if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
935            let (mut next_handler, next_context) =
936                (next_handler.borrow_mut(), next_context.borrow());
937            next_handler.handle_close_internal(&*next_context);
938        } else {
939            warn!("handle_close reached end of pipeline");
940        }
941    }
942}
943
944impl<Rin: 'static, Rout: 'static, Win: 'static, Wout: 'static> ContextInternal
945    for Context<Rin, Rout, Win, Wout>
946{
947    fn fire_transport_active_internal(&self) {
948        self.fire_transport_active();
949    }
950    fn fire_transport_inactive_internal(&self) {
951        self.fire_transport_inactive();
952    }
953    fn fire_handle_read_internal(&self, msg: Box<dyn Any>) {
954        if let Ok(msg) = msg.downcast::<Rout>() {
955            self.fire_handle_read(*msg);
956        } else {
957            panic!("msg can't downcast::<Rout> in {} handler", self.name());
958        }
959    }
960    fn fire_poll_write_internal(&self) -> Option<Box<dyn Any>> {
961        if let Some(msg) = self.fire_poll_write() {
962            Some(Box::new(msg))
963        } else {
964            None
965        }
966    }
967
968    fn fire_handle_timeout_internal(&self, now: Instant) {
969        self.fire_handle_timeout(now);
970    }
971    fn fire_poll_timeout_internal(&self, eto: &mut Instant) {
972        self.fire_poll_timeout(eto);
973    }
974
975    fn fire_handle_eof_internal(&self) {
976        self.fire_handle_eof();
977    }
978    fn fire_handle_error_internal(&self, err: Box<dyn Error>) {
979        self.fire_handle_error(err);
980    }
981    fn fire_handle_close_internal(&self) {
982        self.fire_handle_close();
983    }
984
985    fn name(&self) -> &str {
986        self.name.as_str()
987    }
988    fn as_any(&self) -> &dyn Any {
989        self
990    }
991    fn set_next_context(&mut self, next_context: Option<Rc<RefCell<dyn ContextInternal>>>) {
992        self.next_context = next_context;
993    }
994    fn set_next_handler(&mut self, next_handler: Option<Rc<RefCell<dyn HandlerInternal>>>) {
995        self.next_handler = next_handler;
996    }
997}