Skip to main content

theater_handler_tcp/
lib.rs

1//! # TCP Handler
2//!
3//! Provides raw TCP networking capabilities to WebAssembly actors in the Theater system.
4//! This handler is deliberately minimal — it moves bytes across the boundary and leaves
5//! all protocol complexity (framing, routing, addressing) to actor-space code.
6//!
7//! ## Connection Handoff
8//!
9//! Connections can be transferred between actors for the "accept and hand off" pattern:
10//!
11//! 1. Acceptor calls `accept()` - connection starts in PENDING state
12//! 2. Acceptor spawns a worker actor
13//! 3. Acceptor calls `transfer(conn_id, worker_id)` - atomically transfers and activates
14//! 4. Worker receives `handle-connection` callback and can immediately send/receive
15//!
16//! This prevents race conditions where data arrives before the handoff completes.
17//!
18//! ## Data Modes (Erlang-style)
19//!
20//! Connections support three data modes via `set-active()`:
21//!
22//! - `"passive"` (default): Data received only via explicit `receive()` calls
23//! - `"active"`: Data pushed to actor via `on-data` callback continuously
24//! - `"once"`: Single `on-data` callback, then switches back to passive
25//!
26//! This matches Erlang/OTP's `{active, true/false/once}` socket options.
27//!
28//! ## TLS Support
29//!
30//! TLS can be enabled via manifest configuration:
31//!
32//! ```toml
33//! [[handler]]
34//! type = "tcp"
35//!
36//! [handler.client_tls]
37//! enabled = true
38//! # ca_cert = "/path/to/ca.pem"  # Optional custom CA
39//! # skip_verify = false          # For development only
40//!
41//! [handler.server_tls]
42//! enabled = true
43//! cert = "/path/to/server.pem"
44//! key = "/path/to/server-key.pem"
45//! ```
46//!
47//! When TLS is configured, connections are automatically encrypted. The actor
48//! code doesn't need to change - it uses the same `tcp-connect`, `tcp-listen`,
49//! `tcp-read`, `tcp-write` interface.
50
51mod stream;
52mod tls;
53
54use std::collections::HashMap;
55use std::future::Future;
56use std::net::SocketAddr;
57use std::pin::Pin;
58use std::sync::atomic::{AtomicU64, Ordering};
59use std::sync::Arc;
60use std::time::Instant;
61use tokio::io::{AsyncReadExt, AsyncWriteExt};
62use tokio::net::{TcpListener, TcpStream};
63use tokio::sync::Mutex;
64use tokio_util::sync::CancellationToken;
65use tracing::{debug, error, info, warn};
66
67use stream::{UnifiedReadHalf, UnifiedStream, UnifiedWriteHalf};
68use tls::TlsContext;
69
70use theater::actor::handle::ActorHandle;
71use theater::actor::store::ActorStore;
72use theater::config::actor_manifest::{HandlerConfig, TcpHandlerConfig};
73use theater::handler::{Handler, HandlerContext, SharedActorInstance};
74use theater::id::TheaterId;
75use theater::shutdown::ShutdownReceiver;
76
77use theater::pack_bridge::{
78    parse_pact, AsyncCtx, HostLinkerBuilder, InterfaceImpl, LinkerError, TypeHash, Value, ValueType,
79};
80
81// ============================================================================
82// Interface Declarations
83// ============================================================================
84
85/// Drops at scope exit and emits a `phase=... elapsed_ms=...` debug line.
86/// One line per host fn invocation, regardless of which return path was
87/// taken (incl. `?` short-circuits).
88struct PhaseLog {
89    name: &'static str,
90    start: Instant,
91}
92
93impl PhaseLog {
94    fn new(name: &'static str) -> Self {
95        Self {
96            name,
97            start: Instant::now(),
98        }
99    }
100}
101
102impl Drop for PhaseLog {
103    fn drop(&mut self) {
104        debug!(
105            phase = self.name,
106            elapsed_ms = self.start.elapsed().as_millis() as u64,
107            "tcp phase complete",
108        );
109    }
110}
111
112/// Embedded tcp.pact file content
113const TCP_PACT: &str = include_str!("../tcp.pact");
114
115/// Declare the theater:simple/tcp interface from the pact file.
116fn tcp_interface() -> InterfaceImpl {
117    let pact = parse_pact(TCP_PACT).expect("embedded tcp.pact should be valid");
118    InterfaceImpl::from_pact(&pact)
119}
120
121// ============================================================================
122// Connection State
123// ============================================================================
124
125/// State of a connection in its lifecycle
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127enum ConnectionState {
128    /// Connection accepted but not yet activated - no data operations allowed
129    Pending,
130    /// Connection is active - send/receive allowed
131    Active,
132}
133
134/// Data mode for receiving data (Erlang-style)
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136enum DataMode {
137    /// Data only received via explicit receive() calls
138    Passive,
139    /// Data pushed to on-data callback continuously
140    Active,
141    /// Receive one chunk via on-data, then switch to Passive
142    Once,
143}
144
145/// Represents the stream state based on data mode
146enum StreamState {
147    /// Full stream available for passive mode operations
148    Full(Box<UnifiedStream>),
149    /// Only write half available - read half taken by active mode task
150    WriteOnly(UnifiedWriteHalf),
151    /// Connection closed or stream taken
152    Closed,
153}
154
155/// A tracked TCP connection with ownership and state.
156///
157/// `stream` is wrapped in `Arc<Mutex<...>>` so the outer connections map
158/// mutex is only held briefly for lookup/metadata. The actual I/O acquires
159/// the per-connection lock — this lets two actors do I/O on different
160/// connections in parallel, which is essential for any flow where the
161/// runtime hosts both sides of a TCP conversation (e.g. an outbound SMTP
162/// client talking to a local SMTP server in the same theater instance).
163struct ConnectionEntry {
164    stream: Arc<Mutex<StreamState>>,
165    peer_addr: SocketAddr,
166    owner: TheaterId,
167    state: ConnectionState,
168    data_mode: DataMode,
169}
170
171/// A tracked TCP listener with ownership
172struct ListenerEntry {
173    listener: TcpListener,
174    owner: TheaterId,
175}
176
177/// Shared TCP state across all actor instances.
178///
179/// This state is shared via Arc, so all TcpHandler instances in a Theater
180/// runtime see the same connections and listeners. This enables connection
181/// transfer between actors.
182struct SharedTcpState {
183    connections: Mutex<HashMap<u64, ConnectionEntry>>,
184    listeners: Mutex<HashMap<u64, ListenerEntry>>,
185    next_id: AtomicU64,
186    max_connections: Option<u32>,
187}
188
189impl SharedTcpState {
190    fn new(max_connections: Option<u32>) -> Self {
191        Self {
192            connections: Mutex::new(HashMap::new()),
193            listeners: Mutex::new(HashMap::new()),
194            next_id: AtomicU64::new(1),
195            max_connections,
196        }
197    }
198
199    fn next_id(&self) -> u64 {
200        self.next_id.fetch_add(1, Ordering::Relaxed)
201    }
202
203    async fn check_connection_limit(&self) -> Result<(), Value> {
204        if let Some(max) = self.max_connections {
205            let count = self.connections.lock().await.len();
206            if count >= max as usize {
207                return Err(Value::String(format!(
208                    "Connection limit reached ({}/{})",
209                    count, max
210                )));
211            }
212        }
213        Ok(())
214    }
215}
216
217// ============================================================================
218// Handler Implementation
219// ============================================================================
220
221/// Handler for providing raw TCP networking access to WebAssembly actors.
222#[derive(Clone)]
223pub struct TcpHandler {
224    config: TcpHandlerConfig,
225    /// Shared state across all handler instances - enables connection transfer
226    shared_state: Arc<SharedTcpState>,
227    /// Actor ID for this handler instance - set during setup
228    actor_id: Arc<std::sync::Mutex<Option<TheaterId>>>,
229    /// Actor handle for calling export functions (set in setup, used by listen)
230    actor_handle: Arc<std::sync::Mutex<Option<ActorHandle>>>,
231    /// Cancellation token for spawned background tasks
232    cancellation_token: CancellationToken,
233    /// TLS context for encrypted connections (shared across clones)
234    tls_context: Arc<Option<TlsContext>>,
235}
236
237impl TcpHandler {
238    pub fn new(config: TcpHandlerConfig) -> Self {
239        // Build TLS context from config
240        let tls_context = match TlsContext::from_config(&config) {
241            Ok(ctx) => Arc::new(ctx),
242            Err(e) => {
243                error!("Failed to build TLS context: {}. TLS will be disabled.", e);
244                Arc::new(None)
245            }
246        };
247
248        Self {
249            config,
250            shared_state: Arc::new(SharedTcpState::new(None)),
251            actor_id: Arc::new(std::sync::Mutex::new(None)),
252            actor_handle: Arc::new(std::sync::Mutex::new(None)),
253            cancellation_token: CancellationToken::new(),
254            tls_context,
255        }
256    }
257
258    /// Get the interface declarations for this handler.
259    pub fn interfaces(&self) -> Vec<InterfaceImpl> {
260        vec![tcp_interface()]
261    }
262}
263
264// ── Value parsing helpers ─────────────────────────────────────────────────
265
266fn parse_string(input: &Value) -> Result<String, Value> {
267    match input {
268        Value::String(s) => Ok(s.clone()),
269        Value::Tuple(fields) if fields.len() == 1 => match &fields[0] {
270            Value::String(s) => Ok(s.clone()),
271            _ => Err(Value::String("Expected string".to_string())),
272        },
273        _ => Err(Value::String("Expected string".to_string())),
274    }
275}
276
277fn parse_two_strings(input: &Value) -> Result<(String, String), Value> {
278    match input {
279        Value::Tuple(fields) if fields.len() == 2 => {
280            let a = match &fields[0] {
281                Value::String(s) => s.clone(),
282                _ => return Err(Value::String("Expected string for first arg".to_string())),
283            };
284            let b = match &fields[1] {
285                Value::String(s) => s.clone(),
286                _ => return Err(Value::String("Expected string for second arg".to_string())),
287            };
288            Ok((a, b))
289        }
290        _ => Err(Value::String("Expected tuple (string, string)".to_string())),
291    }
292}
293
294fn parse_string_and_bytes(input: &Value) -> Result<(String, Vec<u8>), Value> {
295    match input {
296        Value::Tuple(fields) if fields.len() == 2 => {
297            let id = match &fields[0] {
298                Value::String(s) => s.clone(),
299                _ => return Err(Value::String("Expected string for id".to_string())),
300            };
301            let data = match &fields[1] {
302                Value::List { items, .. } => items
303                    .iter()
304                    .filter_map(|v| match v {
305                        Value::U8(b) => Some(*b),
306                        _ => None,
307                    })
308                    .collect::<Vec<u8>>(),
309                _ => return Err(Value::String("Expected list<u8> for data".to_string())),
310            };
311            Ok((id, data))
312        }
313        _ => Err(Value::String("Expected tuple (id, data)".to_string())),
314    }
315}
316
317fn parse_string_and_u32(input: &Value) -> Result<(String, u32), Value> {
318    match input {
319        Value::Tuple(fields) if fields.len() == 2 => {
320            let id = match &fields[0] {
321                Value::String(s) => s.clone(),
322                _ => return Err(Value::String("Expected string for id".to_string())),
323            };
324            let n = match &fields[1] {
325                Value::U32(n) => *n,
326                _ => return Err(Value::String("Expected u32".to_string())),
327            };
328            Ok((id, n))
329        }
330        _ => Err(Value::String("Expected tuple (id, u32)".to_string())),
331    }
332}
333
334fn id_to_string(id: u64) -> String {
335    id.to_string()
336}
337
338fn string_to_id(s: &str) -> Result<u64, Value> {
339    s.parse::<u64>()
340        .map_err(|_| Value::String(format!("Invalid id: {}", s)))
341}
342
343// ── Handler implementation ────────────────────────────────────────────────
344
345impl Handler for TcpHandler {
346    fn create_instance(&self, config: Option<&HandlerConfig>) -> Box<dyn Handler> {
347        let tcp_config = match config {
348            Some(HandlerConfig::Tcp { config }) => config.clone(),
349            _ => self.config.clone(),
350        };
351
352        // Build TLS context from config if different from current
353        let tls_context = if config.is_some() {
354            // New config provided, rebuild TLS context
355            match TlsContext::from_config(&tcp_config) {
356                Ok(ctx) => Arc::new(ctx),
357                Err(e) => {
358                    error!("Failed to build TLS context: {}. TLS will be disabled.", e);
359                    Arc::new(None)
360                }
361            }
362        } else {
363            // Reuse existing TLS context
364            self.tls_context.clone()
365        };
366
367        // Share the same state across all instances - this is the key for transfer!
368        // Each instance gets its own cancellation token (cancelled when that actor shuts down)
369        Box::new(TcpHandler {
370            config: tcp_config,
371            shared_state: self.shared_state.clone(), // Same Arc!
372            actor_id: Arc::new(std::sync::Mutex::new(None)),
373            actor_handle: Arc::new(std::sync::Mutex::new(None)),
374            cancellation_token: CancellationToken::new(),
375            tls_context,
376        })
377    }
378
379    fn name(&self) -> &str {
380        "tcp"
381    }
382
383    fn imports(&self) -> Option<Vec<String>> {
384        Some(
385            self.interfaces()
386                .iter()
387                .map(|i| i.name().to_string())
388                .collect(),
389        )
390    }
391
392    fn exports(&self) -> Option<Vec<String>> {
393        Some(vec!["theater:simple/tcp-client".to_string()])
394    }
395
396    fn interface_hashes(&self) -> Vec<(String, TypeHash)> {
397        self.interfaces()
398            .iter()
399            .map(|i| (i.name().to_string(), i.hash()))
400            .collect()
401    }
402
403    fn interfaces(&self) -> Vec<theater::pack_bridge::InterfaceImpl> {
404        vec![tcp_interface()]
405    }
406
407    fn setup(
408        &mut self,
409        actor_handle: ActorHandle,
410        _actor_instance: SharedActorInstance,
411        shutdown_receiver: ShutdownReceiver,
412        _event_rx: theater::handler::HandlerEventReceiver,
413    ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
414        info!("TCP handler setup (passive mode)");
415
416        // Store the actor_handle for use by listen()
417        {
418            let mut handle_guard = self.actor_handle.lock().unwrap();
419            *handle_guard = Some(actor_handle);
420        }
421
422        // Get cancellation token to cancel on shutdown
423        let cancel_token = self.cancellation_token.clone();
424        let shared_state = self.shared_state.clone();
425        let actor_id_for_cleanup = self.actor_id.clone();
426
427        // Wait for shutdown, then clean up all resources OWNED BY THIS ACTOR.
428        // The connections + listeners maps are shared across every TcpHandler
429        // instance in the runtime — one map, many actors. So the cleanup must
430        // filter by owner; clearing the whole map would wipe in-flight
431        // connections owned by other actors. For TLS connections it also
432        // drives `AsyncWriteExt::shutdown` so close_notify reaches the wire
433        // before TCP FIN (the same shape as the `close` host function).
434        Box::pin(async move {
435            info!("TCP handler setup waiting for shutdown signal");
436            shutdown_receiver.wait_for_shutdown().await;
437            info!("TCP handler received shutdown, cleaning up resources");
438
439            // Cancel all spawned background tasks (listeners, active mode readers)
440            cancel_token.cancel();
441            info!("TCP handler cancellation token cancelled");
442
443            let actor_id_val = *actor_id_for_cleanup.lock().unwrap();
444
445            // Collect connection IDs owned by this actor; release the outer
446            // lock before driving per-connection shutdowns (those acquire
447            // the inner stream mutex and may await).
448            let to_remove: Vec<u64> = {
449                let connections = shared_state.connections.lock().await;
450                connections
451                    .iter()
452                    .filter(|(_, entry)| Some(entry.owner) == actor_id_val)
453                    .map(|(id, _)| *id)
454                    .collect()
455            };
456            let conn_count = to_remove.len();
457            for conn_id in to_remove {
458                shutdown_write_half_and_remove(&shared_state, conn_id).await;
459            }
460            if conn_count > 0 {
461                info!(
462                    "TCP handler closed {} connections owned by actor {:?}",
463                    conn_count, actor_id_val
464                );
465            }
466
467            // Same owner-filter for listeners.
468            {
469                let mut listeners = shared_state.listeners.lock().await;
470                let before = listeners.len();
471                listeners.retain(|_, entry| Some(entry.owner) != actor_id_val);
472                let removed = before - listeners.len();
473                if removed > 0 {
474                    info!(
475                        "TCP handler closed {} listeners owned by actor {:?}",
476                        removed, actor_id_val
477                    );
478                }
479            }
480
481            info!("TCP handler shutdown complete");
482            Ok(())
483        })
484    }
485
486    fn setup_host_functions_composite(
487        &mut self,
488        builder: &mut HostLinkerBuilder<'_, ActorStore>,
489        ctx: &mut HandlerContext,
490    ) -> Result<(), LinkerError> {
491        info!("Setting up TCP host functions (Pack)");
492
493        if ctx.is_satisfied("theater:simple/tcp") {
494            info!("theater:simple/tcp already satisfied, skipping");
495            return Ok(());
496        }
497
498        // Get actor ID from context
499        let actor_id = ctx
500            .actor_id
501            .expect("actor_id should be set in HandlerContext");
502
503        // Store actor_id for this instance
504        {
505            let mut id_guard = self.actor_id.lock().unwrap();
506            *id_guard = Some(actor_id);
507        }
508
509        // Update max_connections if configured
510        // Note: We can't easily update the shared state's max_connections here
511        // since it's already created. For now, first handler wins.
512
513        let state = self.shared_state.clone();
514        let actor_id_for_closures = actor_id;
515
516        // Clone handler fields for use in listen() callback
517        let actor_handle_for_listen = self.actor_handle.clone();
518        let cancel_token_for_listen = self.cancellation_token.clone();
519
520        // Clone state and actor_id for each closure
521        let st_connect = state.clone();
522        let aid_connect = actor_id_for_closures;
523        let tls_for_connect = self.tls_context.clone();
524
525        let st_listen = state.clone();
526        let aid_listen = actor_id_for_closures;
527        let tls_for_listen = self.tls_context.clone();
528
529        let st_accept = state.clone();
530        let aid_accept = actor_id_for_closures;
531        let tls_for_accept = self.tls_context.clone();
532
533        let st_activate = state.clone();
534        let aid_activate = actor_id_for_closures;
535
536        let st_set_active = state.clone();
537        let aid_set_active = actor_id_for_closures;
538        let actor_handle_for_set_active = self.actor_handle.clone();
539        let cancel_token_for_set_active = self.cancellation_token.clone();
540
541        let st_transfer = state.clone();
542        let aid_transfer = actor_id_for_closures;
543
544        let st_peer = state.clone();
545        let aid_peer = actor_id_for_closures;
546
547        let st_send = state.clone();
548        let aid_send = actor_id_for_closures;
549
550        let st_receive = state.clone();
551        let aid_receive = actor_id_for_closures;
552        let cancel_token_for_receive = self.cancellation_token.clone();
553
554        let st_close = state.clone();
555        let aid_close = actor_id_for_closures;
556
557        let st_upgrade_server = state.clone();
558        let aid_upgrade_server = actor_id_for_closures;
559        let tls_for_upgrade_server = self.tls_context.clone();
560
561        let st_upgrade_client = state.clone();
562        let aid_upgrade_client = actor_id_for_closures;
563        let tls_for_upgrade_client = self.tls_context.clone();
564
565        let st_close_listener = state.clone();
566        let aid_close_listener = actor_id_for_closures;
567
568        builder
569            .interface("theater:simple/tcp")?
570            // ----------------------------------------------------------------
571            // connect(address: string) -> result<string, string>
572            // Outbound connections are immediately active
573            // ----------------------------------------------------------------
574            .func_async_result(
575                "connect",
576                move |_ctx: AsyncCtx<ActorStore>, input: Value| {
577                    let st = st_connect.clone();
578                    let actor_id = aid_connect;
579                    let tls_ctx = tls_for_connect.clone();
580                    async move {
581                        let _ph = PhaseLog::new("tcp.connect");
582                        let address = parse_string(&input)?;
583                        st.check_connection_limit().await?;
584                        debug!("tcp connect to {}", address);
585
586                        let tcp_stream = TcpStream::connect(&address)
587                            .await
588                            .map_err(|e| Value::String(e.to_string()))?;
589
590                        let peer_addr = tcp_stream
591                            .peer_addr()
592                            .map_err(|e| Value::String(e.to_string()))?;
593
594                        // Apply TLS if configured AND auto_handshake is enabled.
595                        // STARTTLS-style protocols set client_tls.auto_handshake = false:
596                        // the connector is built (so upgrade-to-tls-client works) but
597                        // connect() returns a plain TCP stream until the actor explicitly
598                        // upgrades it after negotiating the STARTTLS handshake.
599                        let auto_handshake = tls_ctx
600                            .as_ref()
601                            .as_ref()
602                            .map(|c| c.client_auto_handshake)
603                            .unwrap_or(true);
604                        let unified_stream = if !auto_handshake {
605                            UnifiedStream::Plain(tcp_stream)
606                        } else if let Some(ref ctx) = *tls_ctx {
607                            if let Some(ref connector) = ctx.client_connector {
608                                // Extract hostname from address for SNI
609                                let server_name = tls::parse_server_name(
610                                    address.split(':').next().unwrap_or(&address),
611                                )
612                                .map_err(|e| Value::String(e.to_string()))?;
613
614                                debug!("tcp connect: performing TLS handshake with SNI {:?}", server_name);
615                                let tls_stream = connector
616                                    .connect(server_name, tcp_stream)
617                                    .await
618                                    .map_err(|e| Value::String(format!("TLS handshake failed: {}", e)))?;
619                                info!("tcp connect: TLS handshake complete");
620                                UnifiedStream::ClientTls(tls_stream)
621                            } else {
622                                UnifiedStream::Plain(tcp_stream)
623                            }
624                        } else {
625                            UnifiedStream::Plain(tcp_stream)
626                        };
627
628                        let id = st.next_id();
629                        st.connections.lock().await.insert(
630                            id,
631                            ConnectionEntry {
632                                stream: Arc::new(Mutex::new(StreamState::Full(Box::new(
633                                    unified_stream,
634                                )))),
635                                peer_addr,
636                                owner: actor_id,
637                                state: ConnectionState::Active, // Outbound = active
638                                data_mode: DataMode::Passive,
639                            },
640                        );
641                        debug!("tcp connected to {} as conn={}", address, id);
642                        Ok::<Value, Value>(Value::String(id_to_string(id)))
643                    }
644                },
645            )?
646            // ----------------------------------------------------------------
647            // listen(address: string) -> result<string, string>
648            // Binds a listener and spawns a background accept loop
649            // ----------------------------------------------------------------
650            .func_async_result(
651                "listen",
652                move |_ctx: AsyncCtx<ActorStore>, input: Value| {
653                    let st = st_listen.clone();
654                    let actor_id = aid_listen;
655                    let actor_handle_arc = actor_handle_for_listen.clone();
656                    let cancel_token = cancel_token_for_listen.clone();
657                    let tls_ctx = tls_for_listen.clone();
658
659                    async move {
660                        let _ph = PhaseLog::new("tcp.listen");
661                        let address = parse_string(&input)?;
662                        debug!("tcp listen on {}", address);
663
664                        let listener = TcpListener::bind(&address)
665                            .await
666                            .map_err(|e| Value::String(e.to_string()))?;
667
668                        let listener_id = st.next_id();
669                        let has_tls = match tls_ctx.as_ref() {
670                            Some(ctx) => ctx.server_acceptor.is_some(),
671                            None => false,
672                        };
673                        info!(
674                            "tcp listening on {} as listener={} (tls={})",
675                            address, listener_id, has_tls
676                        );
677
678                        // Take the actor_handle for use in the accept loop
679                        let actor_handle = {
680                            let guard = actor_handle_arc.lock().unwrap();
681                            guard.clone()
682                        };
683                        let Some(actor_handle) = actor_handle else {
684                            return Err(Value::String(
685                                "Actor handle not available - setup() not called?".to_string(),
686                            ));
687                        };
688
689                        // Clone state for the background task
690                        let st_for_task = st.clone();
691                        let actor_id_for_task = actor_id;
692
693                        // Spawn background accept loop with cancellation support
694                        tokio::spawn(async move {
695                            info!("TCP accept loop started for listener={}", listener_id);
696
697                            loop {
698                                tokio::select! {
699                                    _ = cancel_token.cancelled() => {
700                                        info!("TCP accept loop cancelled for listener={}", listener_id);
701                                        break;
702                                    }
703                                    result = listener.accept() => {
704                                        match result {
705                                            Ok((tcp_stream, peer_addr)) => {
706                                                let conn_id = st_for_task.next_id();
707                                                info!(
708                                                    "tcp accepted conn={} from {} on listener={}",
709                                                    conn_id, peer_addr, listener_id
710                                                );
711
712                                                // Apply TLS if configured
713                                                let unified_stream = if let Some(ref ctx) = *tls_ctx {
714                                                    if let Some(ref acceptor) = ctx.server_acceptor {
715                                                        debug!("tcp accept: performing TLS handshake for conn={}", conn_id);
716                                                        match acceptor.accept(tcp_stream).await {
717                                                            Ok(tls_stream) => {
718                                                                info!("tcp accept: TLS handshake complete for conn={}", conn_id);
719                                                                UnifiedStream::ServerTls(tls_stream)
720                                                            }
721                                                            Err(e) => {
722                                                                error!("TLS handshake failed for conn={}: {}", conn_id, e);
723                                                                continue; // Skip this connection
724                                                            }
725                                                        }
726                                                    } else {
727                                                        UnifiedStream::Plain(tcp_stream)
728                                                    }
729                                                } else {
730                                                    UnifiedStream::Plain(tcp_stream)
731                                                };
732
733                                                // Store connection in PENDING state
734                                                st_for_task.connections.lock().await.insert(
735                                                    conn_id,
736                                                    ConnectionEntry {
737                                                        stream: Arc::new(Mutex::new(
738                                                            StreamState::Full(Box::new(unified_stream)),
739                                                        )),
740                                                        peer_addr,
741                                                        owner: actor_id_for_task,
742                                                        state: ConnectionState::Pending,
743                                                        data_mode: DataMode::Passive,
744                                                    },
745                                                );
746
747                                                // Detach so a slow/blocked handle-connection in the actor
748                                                // cannot wedge the accept loop and saturate the kernel SYN queue.
749                                                let conn_id_str = id_to_string(conn_id);
750                                                let params =
751                                                    Value::Tuple(vec![Value::String(conn_id_str)]);
752                                                let actor_handle_for_call = actor_handle.clone();
753                                                let st_for_cleanup = st_for_task.clone();
754                                                tokio::spawn(async move {
755                                                    if let Err(e) = actor_handle_for_call
756                                                        .call_function(
757                                                            "theater:simple/tcp-client.handle-connection"
758                                                                .to_string(),
759                                                            params,
760                                                        )
761                                                        .await
762                                                    {
763                                                        error!(
764                                                            "Failed to call handle-connection for conn={}: {}",
765                                                            conn_id, e
766                                                        );
767                                                        // Clean up the pending connection
768                                                        st_for_cleanup
769                                                            .connections
770                                                            .lock()
771                                                            .await
772                                                            .remove(&conn_id);
773                                                    }
774                                                });
775                                            }
776                                            Err(e) => {
777                                                error!(
778                                                    "TCP accept error on listener={}: {}",
779                                                    listener_id, e
780                                                );
781                                            }
782                                        }
783                                    }
784                                }
785                            }
786
787                            info!("TCP accept loop stopped for listener={}", listener_id);
788                        });
789
790                        Ok::<Value, Value>(Value::String(id_to_string(listener_id)))
791                    }
792                },
793            )?
794            // ----------------------------------------------------------------
795            // accept(listener-id: string) -> result<string, string>
796            // Manual accept - returns connection in PENDING state
797            // ----------------------------------------------------------------
798            .func_async_result(
799                "accept",
800                move |_ctx: AsyncCtx<ActorStore>, input: Value| {
801                    let st = st_accept.clone();
802                    let actor_id = aid_accept;
803                    let tls_ctx = tls_for_accept.clone();
804                    async move {
805                        let _ph = PhaseLog::new("tcp.accept");
806                        let listener_id_str = parse_string(&input)?;
807                        let listener_id = string_to_id(&listener_id_str)?;
808                        debug!("tcp accept on listener={}", listener_id);
809
810                        // Check ownership
811                        let mut listeners = st.listeners.lock().await;
812                        let entry = listeners.get_mut(&listener_id).ok_or_else(|| {
813                            Value::String(format!("Listener not found: {}", listener_id_str))
814                        })?;
815
816                        if entry.owner != actor_id {
817                            return Err(Value::String(format!(
818                                "Listener {} not owned by this actor",
819                                listener_id_str
820                            )));
821                        }
822
823                        let (tcp_stream, peer_addr) = entry
824                            .listener
825                            .accept()
826                            .await
827                            .map_err(|e| Value::String(e.to_string()))?;
828
829                        let conn_id = st.next_id();
830                        drop(listeners); // Release lock before acquiring connections lock
831
832                        // Apply TLS if configured
833                        let unified_stream = if let Some(ref ctx) = *tls_ctx {
834                            if let Some(ref acceptor) = ctx.server_acceptor {
835                                debug!("tcp manual accept: performing TLS handshake for conn={}", conn_id);
836                                let tls_stream = acceptor
837                                    .accept(tcp_stream)
838                                    .await
839                                    .map_err(|e| Value::String(format!("TLS handshake failed: {}", e)))?;
840                                info!("tcp manual accept: TLS handshake complete for conn={}", conn_id);
841                                UnifiedStream::ServerTls(tls_stream)
842                            } else {
843                                UnifiedStream::Plain(tcp_stream)
844                            }
845                        } else {
846                            UnifiedStream::Plain(tcp_stream)
847                        };
848
849                        st.connections.lock().await.insert(
850                            conn_id,
851                            ConnectionEntry {
852                                stream: Arc::new(Mutex::new(StreamState::Full(Box::new(
853                                    unified_stream,
854                                )))),
855                                peer_addr,
856                                owner: actor_id,
857                                state: ConnectionState::Pending, // Starts pending!
858                                data_mode: DataMode::Passive,
859                            },
860                        );
861                        debug!("tcp accepted conn={} from {} (pending)", conn_id, peer_addr);
862                        Ok::<Value, Value>(Value::String(id_to_string(conn_id)))
863                    }
864                },
865            )?
866            // ----------------------------------------------------------------
867            // activate(connection-id: string) -> result<_, string>
868            // Activate a pending connection for this actor
869            // ----------------------------------------------------------------
870            .func_async_result(
871                "activate",
872                move |_ctx: AsyncCtx<ActorStore>, input: Value| {
873                    let st = st_activate.clone();
874                    let actor_id = aid_activate;
875                    async move {
876                        let _ph = PhaseLog::new("tcp.activate");
877                        let conn_id_str = parse_string(&input)?;
878                        let conn_id = string_to_id(&conn_id_str)?;
879
880                        let mut connections = st.connections.lock().await;
881                        let entry = connections.get_mut(&conn_id).ok_or_else(|| {
882                            Value::String(format!("Connection not found: {}", conn_id_str))
883                        })?;
884
885                        if entry.owner != actor_id {
886                            return Err(Value::String(format!(
887                                "Connection {} not owned by this actor",
888                                conn_id_str
889                            )));
890                        }
891
892                        if entry.state == ConnectionState::Active {
893                            return Err(Value::String(format!(
894                                "Connection {} is already active",
895                                conn_id_str
896                            )));
897                        }
898
899                        entry.state = ConnectionState::Active;
900                        debug!("tcp activated conn={}", conn_id);
901                        Ok::<Value, Value>(Value::Tuple(vec![]))
902                    }
903                },
904            )?
905            // ----------------------------------------------------------------
906            // set-active(connection-id: string, mode: string) -> result<_, string>
907            // Set data mode: "passive", "active", or "once"
908            // ----------------------------------------------------------------
909            .func_async_result(
910                "set-active",
911                move |_ctx: AsyncCtx<ActorStore>, input: Value| {
912                    let st = st_set_active.clone();
913                    let actor_id = aid_set_active;
914                    let actor_handle_arc = actor_handle_for_set_active.clone();
915                    let cancel_token = cancel_token_for_set_active.clone();
916                    async move {
917                        let _ph = PhaseLog::new("tcp.set_active");
918                        let (conn_id_str, mode_str) = parse_two_strings(&input)?;
919                        let conn_id = string_to_id(&conn_id_str)?;
920
921                        let new_mode = match mode_str.as_str() {
922                            "passive" => DataMode::Passive,
923                            "active" => DataMode::Active,
924                            "once" => DataMode::Once,
925                            _ => {
926                                return Err(Value::String(format!(
927                                    "Invalid mode '{}': expected 'passive', 'active', or 'once'",
928                                    mode_str
929                                )));
930                            }
931                        };
932
933                        let mut connections = st.connections.lock().await;
934                        let entry = connections.get_mut(&conn_id).ok_or_else(|| {
935                            Value::String(format!("Connection not found: {}", conn_id_str))
936                        })?;
937
938                        if entry.owner != actor_id {
939                            return Err(Value::String(format!(
940                                "Connection {} not owned by this actor",
941                                conn_id_str
942                            )));
943                        }
944
945                        if entry.state != ConnectionState::Active {
946                            return Err(Value::String(format!(
947                                "Connection {} must be activated before setting data mode",
948                                conn_id_str
949                            )));
950                        }
951
952                        let old_mode = entry.data_mode;
953
954                        // Handle mode transitions
955                        match (old_mode, new_mode) {
956                            (DataMode::Passive, DataMode::Active | DataMode::Once) => {
957                                // Transitioning to active/once mode - split stream and spawn reader
958                                let mut stream_guard = entry.stream.lock().await;
959                                let stream = std::mem::replace(&mut *stream_guard, StreamState::Closed);
960                                let full_stream = match stream {
961                                    StreamState::Full(s) => s,
962                                    other @ StreamState::WriteOnly(_) => {
963                                        // Restore — we replaced with Closed above.
964                                        *stream_guard = other;
965                                        return Err(Value::String(format!(
966                                            "Connection {} is already in active mode",
967                                            conn_id_str
968                                        )));
969                                    }
970                                    StreamState::Closed => {
971                                        return Err(Value::String(format!(
972                                            "Connection {} is closed",
973                                            conn_id_str
974                                        )));
975                                    }
976                                };
977
978                                let (read_half, write_half) = full_stream.into_split();
979                                *stream_guard = StreamState::WriteOnly(write_half);
980                                drop(stream_guard);
981                                entry.data_mode = new_mode;
982
983                                // Get actor handle for callbacks
984                                let actor_handle = {
985                                    let guard = actor_handle_arc.lock().unwrap();
986                                    guard.clone()
987                                };
988                                let Some(actor_handle) = actor_handle else {
989                                    return Err(Value::String(
990                                        "Actor handle not available".to_string(),
991                                    ));
992                                };
993
994                                // Spawn background read task with cancellation support
995                                let conn_id_for_task = conn_id;
996                                let st_for_task = st.clone();
997                                let is_once = new_mode == DataMode::Once;
998                                let cancel_token_for_task = cancel_token.clone();
999
1000                                tokio::spawn(async move {
1001                                    tcp_read_loop(
1002                                        conn_id_for_task,
1003                                        read_half,
1004                                        actor_handle,
1005                                        st_for_task,
1006                                        is_once,
1007                                        cancel_token_for_task,
1008                                    )
1009                                    .await;
1010                                });
1011
1012                                info!(
1013                                    "tcp conn={} set to {} mode, read loop spawned",
1014                                    conn_id, mode_str
1015                                );
1016                            }
1017                            (DataMode::Active | DataMode::Once, DataMode::Passive) => {
1018                                // Can't go back to passive once in active mode (stream is split)
1019                                return Err(Value::String(format!(
1020                                    "Cannot switch connection {} back to passive mode (stream is split)",
1021                                    conn_id_str
1022                                )));
1023                            }
1024                            (DataMode::Active, DataMode::Once) | (DataMode::Once, DataMode::Active) => {
1025                                // Can't switch between active and once (would need to stop/restart reader)
1026                                return Err(Value::String(format!(
1027                                    "Cannot switch connection {} between active and once modes",
1028                                    conn_id_str
1029                                )));
1030                            }
1031                            _ => {
1032                                // Same mode, no-op
1033                                debug!("tcp conn={} already in {} mode", conn_id, mode_str);
1034                            }
1035                        }
1036
1037                        Ok::<Value, Value>(Value::Tuple(vec![]))
1038                    }
1039                },
1040            )?
1041            // ----------------------------------------------------------------
1042            // transfer(connection-id: string, target-actor: string) -> result<_, string>
1043            // Transfer connection to another actor (and activate it)
1044            // ----------------------------------------------------------------
1045            .func_async_result(
1046                "transfer",
1047                move |ctx: AsyncCtx<ActorStore>, input: Value| {
1048                    let st = st_transfer.clone();
1049                    let actor_id = aid_transfer;
1050                    async move {
1051                        let _ph = PhaseLog::new("tcp.transfer");
1052                        let (conn_id_str, target_actor_str) = parse_two_strings(&input)?;
1053                        let conn_id = string_to_id(&conn_id_str)?;
1054
1055                        let target_actor: TheaterId = target_actor_str
1056                            .parse()
1057                            .map_err(|e| Value::String(format!("Invalid actor ID: {}", e)))?;
1058
1059                        {
1060                            let mut connections = st.connections.lock().await;
1061                            let entry = connections.get_mut(&conn_id).ok_or_else(|| {
1062                                Value::String(format!("Connection not found: {}", conn_id_str))
1063                            })?;
1064
1065                            if entry.owner != actor_id {
1066                                return Err(Value::String(format!(
1067                                    "Connection {} not owned by this actor",
1068                                    conn_id_str
1069                                )));
1070                            }
1071
1072                            // Transfer ownership and activate
1073                            let old_owner = entry.owner;
1074                            entry.owner = target_actor;
1075                            entry.state = ConnectionState::Active;
1076
1077                            info!(
1078                                "tcp transferred conn={} from {} to {} (now active)",
1079                                conn_id, old_owner, target_actor
1080                            );
1081                        }
1082
1083                        // Get target actor's handle and call handle-connection-transfer
1084                        let store = ctx.data();
1085                        let theater_tx = store.theater_tx.clone();
1086
1087                        let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
1088                        let get_handle_cmd = theater::messages::TheaterCommand::GetActorHandle {
1089                            actor_id: target_actor,
1090                            response_tx: handle_tx,
1091                        };
1092                        theater_tx.send(get_handle_cmd).await
1093                            .map_err(|e| Value::String(format!("Failed to get target handle: {}", e)))?;
1094
1095                        let target_handle = match handle_rx.await {
1096                            Ok(Some(handle)) => handle,
1097                            Ok(None) => return Err(Value::String("Target actor handle not found".to_string())),
1098                            Err(e) => return Err(Value::String(format!("Failed to receive handle: {}", e))),
1099                        };
1100
1101                        // Call handle-connection-transfer on target
1102                        // Just pass conn_id - runtime will prepend state to make (state, conn_id)
1103                        let params = Value::String(conn_id_str.clone());
1104                        if let Err(e) = target_handle
1105                            .call_function(
1106                                "theater:simple/tcp-client.handle-connection-transfer".to_string(),
1107                                params,
1108                            )
1109                            .await
1110                        {
1111                            warn!("Failed to call handle-connection-transfer: {:?}", e);
1112                            // Don't fail the transfer, just log the warning
1113                        }
1114
1115                        Ok::<Value, Value>(Value::Tuple(vec![]))
1116                    }
1117                },
1118            )?
1119            // ----------------------------------------------------------------
1120            // peer-address(connection-id: string) -> result<string, string>
1121            // Get peer address (works in pending or active state)
1122            // ----------------------------------------------------------------
1123            .func_async_result(
1124                "peer-address",
1125                move |_ctx: AsyncCtx<ActorStore>, input: Value| {
1126                    let st = st_peer.clone();
1127                    let actor_id = aid_peer;
1128                    async move {
1129                        let _ph = PhaseLog::new("tcp.peer_address");
1130                        let conn_id_str = parse_string(&input)?;
1131                        let conn_id = string_to_id(&conn_id_str)?;
1132
1133                        let connections = st.connections.lock().await;
1134                        let entry = connections.get(&conn_id).ok_or_else(|| {
1135                            Value::String(format!("Connection not found: {}", conn_id_str))
1136                        })?;
1137
1138                        if entry.owner != actor_id {
1139                            return Err(Value::String(format!(
1140                                "Connection {} not owned by this actor",
1141                                conn_id_str
1142                            )));
1143                        }
1144
1145                        Ok::<Value, Value>(Value::String(entry.peer_addr.to_string()))
1146                    }
1147                },
1148            )?
1149            // ----------------------------------------------------------------
1150            // send(connection-id: string, data: list<u8>) -> result<u64, string>
1151            // ----------------------------------------------------------------
1152            .func_async_result(
1153                "send",
1154                move |_ctx: AsyncCtx<ActorStore>, input: Value| {
1155                    let st = st_send.clone();
1156                    let actor_id = aid_send;
1157                    async move {
1158                        let _ph = PhaseLog::new("tcp.send");
1159                        let (conn_id_str, data) = parse_string_and_bytes(&input)?;
1160                        let conn_id = string_to_id(&conn_id_str)?;
1161                        let len = data.len();
1162
1163                        // Lock the outer map only long enough to validate metadata
1164                        // and clone the per-connection stream Arc. The actual I/O
1165                        // runs without holding the outer lock — that lets two
1166                        // actors do I/O on different connections in parallel.
1167                        let stream_arc = {
1168                            let connections = st.connections.lock().await;
1169                            let entry = connections.get(&conn_id).ok_or_else(|| {
1170                                Value::String(format!("Connection not found: {}", conn_id_str))
1171                            })?;
1172
1173                            if entry.owner != actor_id {
1174                                return Err(Value::String(format!(
1175                                    "Connection {} not owned by this actor",
1176                                    conn_id_str
1177                                )));
1178                            }
1179
1180                            if entry.state == ConnectionState::Pending {
1181                                return Err(Value::String(format!(
1182                                    "Connection {} is pending - call activate() or transfer() first",
1183                                    conn_id_str
1184                                )));
1185                            }
1186
1187                            entry.stream.clone()
1188                        };
1189
1190                        let mut stream_guard = stream_arc.lock().await;
1191                        match &mut *stream_guard {
1192                            StreamState::Full(stream) => {
1193                                stream
1194                                    .write_all(&data)
1195                                    .await
1196                                    .map_err(|e| Value::String(e.to_string()))?;
1197                            }
1198                            StreamState::WriteOnly(write_half) => {
1199                                write_half
1200                                    .write_all(&data)
1201                                    .await
1202                                    .map_err(|e| Value::String(e.to_string()))?;
1203                            }
1204                            StreamState::Closed => {
1205                                return Err(Value::String(format!(
1206                                    "Connection {} is closed",
1207                                    conn_id_str
1208                                )));
1209                            }
1210                        }
1211
1212                        debug!("tcp send conn={} {} bytes", conn_id, len);
1213                        Ok::<Value, Value>(Value::U64(len as u64))
1214                    }
1215                },
1216            )?
1217            // ----------------------------------------------------------------
1218            // receive(connection-id: string, max-bytes: u32) -> result<list<u8>, string>
1219            // ----------------------------------------------------------------
1220            .func_async_result(
1221                "receive",
1222                move |_ctx: AsyncCtx<ActorStore>, input: Value| {
1223                    let st = st_receive.clone();
1224                    let actor_id = aid_receive;
1225                    let cancel_token = cancel_token_for_receive.clone();
1226                    async move {
1227                        let _ph = PhaseLog::new("tcp.receive");
1228                        let (conn_id_str, max_bytes) = parse_string_and_u32(&input)?;
1229                        let conn_id = string_to_id(&conn_id_str)?;
1230
1231                        // Lock the outer map only long enough to validate metadata
1232                        // and clone the per-connection stream Arc. The actual read
1233                        // runs without holding the outer lock so other actors can
1234                        // do their own I/O concurrently on other connections.
1235                        let stream_arc = {
1236                            let connections = st.connections.lock().await;
1237                            let entry = connections.get(&conn_id).ok_or_else(|| {
1238                                Value::String(format!("Connection not found: {}", conn_id_str))
1239                            })?;
1240
1241                            if entry.owner != actor_id {
1242                                return Err(Value::String(format!(
1243                                    "Connection {} not owned by this actor",
1244                                    conn_id_str
1245                                )));
1246                            }
1247
1248                            if entry.state == ConnectionState::Pending {
1249                                return Err(Value::String(format!(
1250                                    "Connection {} is pending - call activate() or transfer() first",
1251                                    conn_id_str
1252                                )));
1253                            }
1254
1255                            if entry.data_mode != DataMode::Passive {
1256                                return Err(Value::String(format!(
1257                                    "Connection {} is in active mode - data is pushed via on-data callback",
1258                                    conn_id_str
1259                                )));
1260                            }
1261
1262                            entry.stream.clone()
1263                        };
1264
1265                        let mut stream_guard = stream_arc.lock().await;
1266                        let stream = match &mut *stream_guard {
1267                            StreamState::Full(stream) => stream,
1268                            StreamState::WriteOnly(_) => {
1269                                return Err(Value::String(format!(
1270                                    "Connection {} read half not available (in active mode)",
1271                                    conn_id_str
1272                                )));
1273                            }
1274                            StreamState::Closed => {
1275                                return Err(Value::String(format!(
1276                                    "Connection {} is closed",
1277                                    conn_id_str
1278                                )));
1279                            }
1280                        };
1281
1282                        let mut buf = vec![0u8; max_bytes as usize];
1283
1284                        // Use select to make the read interruptible on shutdown
1285                        let n = tokio::select! {
1286                            result = stream.read(&mut buf) => {
1287                                result.map_err(|e| Value::String(e.to_string()))?
1288                            }
1289                            _ = cancel_token.cancelled() => {
1290                                info!("TCP receive cancelled due to shutdown, conn={}", conn_id);
1291                                return Err(Value::String("Connection closed: actor shutting down".to_string()));
1292                            }
1293                        };
1294
1295                        debug!(
1296                            "tcp receive conn={} {} bytes (max={})",
1297                            conn_id, n, max_bytes
1298                        );
1299                        buf.truncate(n);
1300                        Ok::<Value, Value>(Value::List {
1301                            elem_type: ValueType::U8,
1302                            items: buf.into_iter().map(Value::U8).collect(),
1303                        })
1304                    }
1305                },
1306            )?
1307            // ----------------------------------------------------------------
1308            // close(connection-id: string) -> result<_, string>
1309            //
1310            // Gracefully shuts the write side of the stream before dropping —
1311            // for TLS streams this sends the close_notify alert so strict
1312            // clients (e.g. rustls) don't see an "unexpected EOF". Plain TCP
1313            // streams get a normal FIN.
1314            // ----------------------------------------------------------------
1315            .func_async_result(
1316                "close",
1317                move |_ctx: AsyncCtx<ActorStore>, input: Value| {
1318                    let st = st_close.clone();
1319                    let actor_id = aid_close;
1320                    async move {
1321                        let _ph = PhaseLog::new("tcp.close");
1322                        let conn_id_str = parse_string(&input)?;
1323                        let conn_id = string_to_id(&conn_id_str)?;
1324
1325                        // Take the stream out of the map first, holding the
1326                        // outer lock only long enough to validate ownership
1327                        // and remove the entry.
1328                        let stream_arc = {
1329                            let mut connections = st.connections.lock().await;
1330                            let entry = connections.get(&conn_id).ok_or_else(|| {
1331                                Value::String(format!("Connection not found: {}", conn_id_str))
1332                            })?;
1333                            if entry.owner != actor_id {
1334                                return Err(Value::String(format!(
1335                                    "Connection {} not owned by this actor",
1336                                    conn_id_str
1337                                )));
1338                            }
1339                            let arc = entry.stream.clone();
1340                            connections.remove(&conn_id);
1341                            arc
1342                        };
1343
1344                        // Move the stream out and call shutdown on the write
1345                        // side. Errors are non-fatal — peer may already have
1346                        // closed — but we log them at WARN so production can
1347                        // tell apart a clean close_notify-sent path from a
1348                        // shutdown failure that drops bytes (ticket #10).
1349                        let mut guard = stream_arc.lock().await;
1350                        let taken = std::mem::replace(&mut *guard, StreamState::Closed);
1351                        drop(guard);
1352                        let shutdown_result = match taken {
1353                            StreamState::Full(mut s) => {
1354                                Some(AsyncWriteExt::shutdown(&mut *s).await)
1355                            }
1356                            StreamState::WriteOnly(mut w) => {
1357                                Some(AsyncWriteExt::shutdown(&mut w).await)
1358                            }
1359                            StreamState::Closed => None,
1360                        };
1361                        match shutdown_result {
1362                            Some(Ok(())) => {
1363                                debug!("tcp close conn={} (graceful shutdown ok)", conn_id);
1364                            }
1365                            Some(Err(e)) => {
1366                                warn!(
1367                                    "tcp close conn={} shutdown error (close_notify may not have been sent): {}",
1368                                    conn_id, e
1369                                );
1370                            }
1371                            None => {
1372                                debug!("tcp close conn={} (already closed)", conn_id);
1373                            }
1374                        }
1375                        Ok::<Value, Value>(Value::Tuple(vec![]))
1376                    }
1377                },
1378            )?
1379            // ----------------------------------------------------------------
1380            // upgrade-to-tls-server(connection-id: string) -> result<_, string>
1381            //
1382            // For STARTTLS-style protocols: the actor accepts a plain TCP
1383            // connection, exchanges a few protocol lines, then calls this to
1384            // wrap the existing stream with TLS using the server_tls cert
1385            // configured on this handler. After this returns Ok, the same
1386            // connection-id transports TLS-encrypted bytes.
1387            // ----------------------------------------------------------------
1388            .func_async_result(
1389                "upgrade-to-tls-server",
1390                move |_ctx: AsyncCtx<ActorStore>, input: Value| {
1391                    let st = st_upgrade_server.clone();
1392                    let actor_id = aid_upgrade_server;
1393                    let tls_ctx = tls_for_upgrade_server.clone();
1394                    async move {
1395                        let _ph = PhaseLog::new("tcp.upgrade_to_tls_server");
1396                        let conn_id_str = parse_string(&input)?;
1397                        let conn_id = string_to_id(&conn_id_str)?;
1398
1399                        let acceptor = match tls_ctx.as_ref() {
1400                            Some(ctx) => match &ctx.server_acceptor {
1401                                Some(a) => a.clone(),
1402                                None => {
1403                                    return Err(Value::String(
1404                                        "server_tls not configured on this handler".into(),
1405                                    ))
1406                                }
1407                            },
1408                            None => {
1409                                return Err(Value::String(
1410                                    "server_tls not configured on this handler".into(),
1411                                ))
1412                            }
1413                        };
1414
1415                        let stream_arc = {
1416                            let connections = st.connections.lock().await;
1417                            let entry = connections.get(&conn_id).ok_or_else(|| {
1418                                Value::String(format!("Connection not found: {}", conn_id_str))
1419                            })?;
1420                            if entry.owner != actor_id {
1421                                return Err(Value::String(format!(
1422                                    "Connection {} not owned by this actor",
1423                                    conn_id_str
1424                                )));
1425                            }
1426                            if entry.state != ConnectionState::Active {
1427                                return Err(Value::String(format!(
1428                                    "Connection {} must be activated before TLS upgrade",
1429                                    conn_id_str
1430                                )));
1431                            }
1432                            if entry.data_mode != DataMode::Passive {
1433                                return Err(Value::String(format!(
1434                                    "Connection {} must be in passive mode for TLS upgrade",
1435                                    conn_id_str
1436                                )));
1437                            }
1438                            entry.stream.clone()
1439                        };
1440
1441                        let mut guard = stream_arc.lock().await;
1442                        let taken = std::mem::replace(&mut *guard, StreamState::Closed);
1443                        let inner = match taken {
1444                            StreamState::Full(boxed) => *boxed,
1445                            StreamState::WriteOnly(w) => {
1446                                *guard = StreamState::WriteOnly(w);
1447                                return Err(Value::String(format!(
1448                                    "Connection {} is split; TLS upgrade not supported",
1449                                    conn_id_str
1450                                )));
1451                            }
1452                            StreamState::Closed => {
1453                                return Err(Value::String(format!(
1454                                    "Connection {} is closed",
1455                                    conn_id_str
1456                                )));
1457                            }
1458                        };
1459                        let tcp = match inner {
1460                            UnifiedStream::Plain(tcp) => tcp,
1461                            other => {
1462                                *guard = StreamState::Full(Box::new(other));
1463                                return Err(Value::String(format!(
1464                                    "Connection {} is already TLS",
1465                                    conn_id_str
1466                                )));
1467                            }
1468                        };
1469
1470                        let tls_stream = match acceptor.accept(tcp).await {
1471                            Ok(s) => s,
1472                            Err(e) => {
1473                                // Stream is gone — leave entry as Closed.
1474                                return Err(Value::String(format!(
1475                                    "TLS server handshake failed: {}",
1476                                    e
1477                                )));
1478                            }
1479                        };
1480                        *guard = StreamState::Full(Box::new(UnifiedStream::ServerTls(tls_stream)));
1481                        drop(guard);
1482
1483                        debug!("tcp upgrade-to-tls-server conn={}", conn_id);
1484                        Ok::<Value, Value>(Value::Tuple(vec![]))
1485                    }
1486                },
1487            )?
1488            // ----------------------------------------------------------------
1489            // upgrade-to-tls-client(connection-id, server-name) -> result<_, string>
1490            //
1491            // The client-side mirror of upgrade-to-tls-server: wraps an
1492            // existing plain TCP connection with TLS using the client_tls
1493            // config. server-name is used for SNI and cert verification.
1494            // ----------------------------------------------------------------
1495            .func_async_result(
1496                "upgrade-to-tls-client",
1497                move |_ctx: AsyncCtx<ActorStore>, input: Value| {
1498                    let st = st_upgrade_client.clone();
1499                    let actor_id = aid_upgrade_client;
1500                    let tls_ctx = tls_for_upgrade_client.clone();
1501                    async move {
1502                        let _ph = PhaseLog::new("tcp.upgrade_to_tls_client");
1503                        let (conn_id_str, server_name_str) = parse_two_strings(&input)?;
1504                        let conn_id = string_to_id(&conn_id_str)?;
1505
1506                        let connector = match tls_ctx.as_ref() {
1507                            Some(ctx) => match &ctx.client_connector {
1508                                Some(c) => c.clone(),
1509                                None => {
1510                                    return Err(Value::String(
1511                                        "client_tls not configured on this handler".into(),
1512                                    ))
1513                                }
1514                            },
1515                            None => {
1516                                return Err(Value::String(
1517                                    "client_tls not configured on this handler".into(),
1518                                ))
1519                            }
1520                        };
1521
1522                        let server_name =
1523                            rustls::pki_types::ServerName::try_from(server_name_str.clone())
1524                                .map_err(|e| {
1525                                    Value::String(format!(
1526                                        "Invalid server name {:?}: {}",
1527                                        server_name_str, e
1528                                    ))
1529                                })?;
1530
1531                        let stream_arc = {
1532                            let connections = st.connections.lock().await;
1533                            let entry = connections.get(&conn_id).ok_or_else(|| {
1534                                Value::String(format!("Connection not found: {}", conn_id_str))
1535                            })?;
1536                            if entry.owner != actor_id {
1537                                return Err(Value::String(format!(
1538                                    "Connection {} not owned by this actor",
1539                                    conn_id_str
1540                                )));
1541                            }
1542                            if entry.state != ConnectionState::Active {
1543                                return Err(Value::String(format!(
1544                                    "Connection {} must be activated before TLS upgrade",
1545                                    conn_id_str
1546                                )));
1547                            }
1548                            if entry.data_mode != DataMode::Passive {
1549                                return Err(Value::String(format!(
1550                                    "Connection {} must be in passive mode for TLS upgrade",
1551                                    conn_id_str
1552                                )));
1553                            }
1554                            entry.stream.clone()
1555                        };
1556
1557                        let mut guard = stream_arc.lock().await;
1558                        let taken = std::mem::replace(&mut *guard, StreamState::Closed);
1559                        let inner = match taken {
1560                            StreamState::Full(boxed) => *boxed,
1561                            StreamState::WriteOnly(w) => {
1562                                *guard = StreamState::WriteOnly(w);
1563                                return Err(Value::String(format!(
1564                                    "Connection {} is split; TLS upgrade not supported",
1565                                    conn_id_str
1566                                )));
1567                            }
1568                            StreamState::Closed => {
1569                                return Err(Value::String(format!(
1570                                    "Connection {} is closed",
1571                                    conn_id_str
1572                                )));
1573                            }
1574                        };
1575                        let tcp = match inner {
1576                            UnifiedStream::Plain(tcp) => tcp,
1577                            other => {
1578                                *guard = StreamState::Full(Box::new(other));
1579                                return Err(Value::String(format!(
1580                                    "Connection {} is already TLS",
1581                                    conn_id_str
1582                                )));
1583                            }
1584                        };
1585
1586                        let tls_stream = match connector.connect(server_name, tcp).await {
1587                            Ok(s) => s,
1588                            Err(e) => {
1589                                return Err(Value::String(format!(
1590                                    "TLS client handshake failed: {}",
1591                                    e
1592                                )));
1593                            }
1594                        };
1595                        *guard = StreamState::Full(Box::new(UnifiedStream::ClientTls(tls_stream)));
1596                        drop(guard);
1597
1598                        debug!(
1599                            "tcp upgrade-to-tls-client conn={} server_name={}",
1600                            conn_id, server_name_str
1601                        );
1602                        Ok::<Value, Value>(Value::Tuple(vec![]))
1603                    }
1604                },
1605            )?
1606            // ----------------------------------------------------------------
1607            // close-listener(listener-id: string) -> result<_, string>
1608            // ----------------------------------------------------------------
1609            .func_async_result(
1610                "close-listener",
1611                move |_ctx: AsyncCtx<ActorStore>, input: Value| {
1612                    let st = st_close_listener.clone();
1613                    let actor_id = aid_close_listener;
1614                    async move {
1615                        let _ph = PhaseLog::new("tcp.close_listener");
1616                        let listener_id_str = parse_string(&input)?;
1617                        let listener_id = string_to_id(&listener_id_str)?;
1618
1619                        let mut listeners = st.listeners.lock().await;
1620                        let entry = listeners.get(&listener_id).ok_or_else(|| {
1621                            Value::String(format!("Listener not found: {}", listener_id_str))
1622                        })?;
1623
1624                        if entry.owner != actor_id {
1625                            return Err(Value::String(format!(
1626                                "Listener {} not owned by this actor",
1627                                listener_id_str
1628                            )));
1629                        }
1630
1631                        listeners.remove(&listener_id);
1632                        debug!("tcp close listener={}", listener_id);
1633                        Ok::<Value, Value>(Value::Tuple(vec![]))
1634                    }
1635                },
1636            )?;
1637
1638        ctx.mark_satisfied("theater:simple/tcp");
1639        info!("TCP host functions (Pack) set up successfully");
1640        Ok(())
1641    }
1642
1643    fn supports_composite(&self) -> bool {
1644        true
1645    }
1646}
1647
1648// ============================================================================
1649// Active Mode Read Loop
1650// ============================================================================
1651
1652/// Buffer size for active mode reads
1653const ACTIVE_READ_BUFFER_SIZE: usize = 8192;
1654
1655/// Remove the connection entry from the shared map and, for TLS streams in
1656/// the `WriteOnly(write_half)` state, flush the close_notify alert before the
1657/// underlying TCP gets FIN'd.
1658///
1659/// Used by `tcp_read_loop` on every cleanup branch (peer-EOF, read error,
1660/// cancellation). Without the shutdown call, dropping the WriteOnly write
1661/// half drops the rustls session without flushing its outgoing close_notify
1662/// alert — peers then observe a bare TCP FIN and surface it as
1663/// `UnexpectedEof` ("peer closed connection without sending TLS
1664/// close_notify"). The explicit `close` host function had this right
1665/// already (PR #45); this is the parallel fix for the auto-cleanup path.
1666async fn shutdown_write_half_and_remove(shared_state: &Arc<SharedTcpState>, conn_id: u64) {
1667    let stream_arc = {
1668        let mut connections = shared_state.connections.lock().await;
1669        connections.remove(&conn_id).map(|e| e.stream)
1670    };
1671    let Some(stream_arc) = stream_arc else {
1672        return;
1673    };
1674    let mut guard = stream_arc.lock().await;
1675    let taken = std::mem::replace(&mut *guard, StreamState::Closed);
1676    drop(guard);
1677    match taken {
1678        StreamState::WriteOnly(mut w) => {
1679            if let Err(e) = AsyncWriteExt::shutdown(&mut w).await {
1680                warn!(
1681                    "tcp conn={} cleanup shutdown error (close_notify may not have been sent): {}",
1682                    conn_id, e
1683                );
1684            }
1685        }
1686        StreamState::Full(mut s) => {
1687            // Active-mode normally leaves the entry in WriteOnly state, but
1688            // handle Full defensively in case the reader exits before set-active
1689            // could complete the split.
1690            if let Err(e) = AsyncWriteExt::shutdown(&mut *s).await {
1691                warn!(
1692                    "tcp conn={} cleanup shutdown error (close_notify may not have been sent): {}",
1693                    conn_id, e
1694                );
1695            }
1696        }
1697        StreamState::Closed => {}
1698    }
1699}
1700
1701/// Background task that reads from a connection and calls on-data/on-close callbacks.
1702///
1703/// This is spawned when a connection enters "active" or "once" mode.
1704async fn tcp_read_loop(
1705    conn_id: u64,
1706    mut read_half: UnifiedReadHalf,
1707    actor_handle: ActorHandle,
1708    shared_state: Arc<SharedTcpState>,
1709    is_once: bool,
1710    cancel_token: CancellationToken,
1711) {
1712    let conn_id_str = id_to_string(conn_id);
1713    info!(
1714        "tcp read loop started for conn={} (once={})",
1715        conn_id, is_once
1716    );
1717
1718    let mut buf = vec![0u8; ACTIVE_READ_BUFFER_SIZE];
1719
1720    loop {
1721        tokio::select! {
1722            _ = cancel_token.cancelled() => {
1723                info!("tcp read loop cancelled for conn={}", conn_id);
1724                shutdown_write_half_and_remove(&shared_state, conn_id).await;
1725                break;
1726            }
1727            result = read_half.read(&mut buf) => {
1728                match result {
1729                    Ok(0) => {
1730                        // EOF - connection closed by peer
1731                        info!("tcp conn={} received EOF", conn_id);
1732
1733                        // Call on-close callback
1734                        let params = Value::Tuple(vec![
1735                            Value::String(conn_id_str.clone()),
1736                            Value::String("eof".to_string()),
1737                        ]);
1738
1739                        if let Err(e) = actor_handle
1740                            .call_function("theater:simple/tcp-client.on-close".to_string(), params)
1741                            .await
1742                        {
1743                            warn!("tcp conn={} on-close callback failed: {}", conn_id, e);
1744                        }
1745
1746                        shutdown_write_half_and_remove(&shared_state, conn_id).await;
1747                        break;
1748                    }
1749                    Ok(n) => {
1750                        // Data received - call on-data callback
1751                        let data = buf[..n].to_vec();
1752                        debug!("tcp conn={} received {} bytes, calling on-data", conn_id, n);
1753
1754                        let params = Value::Tuple(vec![
1755                            Value::String(conn_id_str.clone()),
1756                            Value::List {
1757                                elem_type: ValueType::U8,
1758                                items: data.into_iter().map(Value::U8).collect(),
1759                            },
1760                        ]);
1761
1762                        if let Err(e) = actor_handle
1763                            .call_function("theater:simple/tcp-client.on-data".to_string(), params)
1764                            .await
1765                        {
1766                            error!("tcp conn={} on-data callback failed: {}", conn_id, e);
1767                            // Continue reading even if callback fails
1768                        }
1769
1770                        if is_once {
1771                            // Once mode: switch back to passive after one read
1772                            info!("tcp conn={} once mode complete, switching to passive", conn_id);
1773
1774                            // Update the connection's data mode
1775                            if let Some(entry) = shared_state.connections.lock().await.get_mut(&conn_id) {
1776                                entry.data_mode = DataMode::Passive;
1777                            }
1778                            break;
1779                        }
1780                    }
1781                    Err(e) => {
1782                        // Read error - connection broken
1783                        error!("tcp conn={} read error: {}", conn_id, e);
1784
1785                        // Call on-close callback with error
1786                        let params = Value::Tuple(vec![
1787                            Value::String(conn_id_str.clone()),
1788                            Value::String(e.to_string()),
1789                        ]);
1790
1791                        if let Err(e) = actor_handle
1792                            .call_function("theater:simple/tcp-client.on-close".to_string(), params)
1793                            .await
1794                        {
1795                            warn!("tcp conn={} on-close callback failed: {}", conn_id, e);
1796                        }
1797
1798                        shutdown_write_half_and_remove(&shared_state, conn_id).await;
1799                        break;
1800                    }
1801                }
1802            }
1803        }
1804    }
1805
1806    info!("tcp read loop stopped for conn={}", conn_id);
1807}
1808
1809#[cfg(test)]
1810mod tests {
1811    use super::*;
1812
1813    #[test]
1814    fn test_tcp_handler_creation() {
1815        let config = TcpHandlerConfig::default();
1816        let handler = TcpHandler::new(config);
1817
1818        assert_eq!(handler.name(), "tcp");
1819        assert_eq!(
1820            handler.imports(),
1821            Some(vec!["theater:simple/tcp".to_string()])
1822        );
1823        assert_eq!(
1824            handler.exports(),
1825            Some(vec!["theater:simple/tcp-client".to_string()])
1826        );
1827    }
1828
1829    #[test]
1830    fn test_tcp_handler_clone_shares_state() {
1831        let config = TcpHandlerConfig::default();
1832        let handler = TcpHandler::new(config);
1833        let cloned = handler.create_instance(None);
1834
1835        // Both should have the same name
1836        assert_eq!(cloned.name(), "tcp");
1837
1838        // The key test: shared_state Arc should be the same
1839        // (We can't easily test this without exposing internals, but the
1840        // implementation clones the Arc, not the data)
1841    }
1842
1843    #[test]
1844    fn test_tcp_interface_hash_determinism() {
1845        let interface1 = tcp_interface();
1846        let interface2 = tcp_interface();
1847        assert_eq!(interface1.hash(), interface2.hash());
1848    }
1849
1850    #[test]
1851    fn test_tcp_handler_interface_hashes() {
1852        let config = TcpHandlerConfig::default();
1853        let handler = TcpHandler::new(config);
1854
1855        let hashes = handler.interface_hashes();
1856        assert_eq!(hashes.len(), 1);
1857        assert_eq!(hashes[0].0, "theater:simple/tcp");
1858
1859        // Hash should be non-zero
1860        assert!(!hashes[0].1.as_bytes().iter().all(|&b| b == 0));
1861    }
1862
1863    #[test]
1864    fn test_connection_state_enum() {
1865        assert_ne!(ConnectionState::Pending, ConnectionState::Active);
1866    }
1867
1868    #[test]
1869    fn test_data_mode_enum() {
1870        assert_ne!(DataMode::Passive, DataMode::Active);
1871        assert_ne!(DataMode::Passive, DataMode::Once);
1872        assert_ne!(DataMode::Active, DataMode::Once);
1873    }
1874}