theater_handler_tcp/lib.rs
1//! # TCP Handler
2//!
3//! Provides raw TCP networking capabilities to WebAssembly actors in the Theater system.
4//! This handler is deliberately minimal — it moves bytes across the boundary and leaves
5//! all protocol complexity (framing, routing, addressing) to actor-space code.
6//!
7//! ## Connection Handoff
8//!
9//! Connections can be transferred between actors for the "accept and hand off" pattern:
10//!
11//! 1. Acceptor calls `accept()` - connection starts in PENDING state
12//! 2. Acceptor spawns a worker actor
13//! 3. Acceptor calls `transfer(conn_id, worker_id)` - atomically transfers and activates
14//! 4. Worker receives `handle-connection` callback and can immediately send/receive
15//!
16//! This prevents race conditions where data arrives before the handoff completes.
17//!
18//! ## Data Modes (Erlang-style)
19//!
20//! Connections support three data modes via `set-active()`:
21//!
22//! - `"passive"` (default): Data received only via explicit `receive()` calls
23//! - `"active"`: Data pushed to actor via `on-data` callback continuously
24//! - `"once"`: Single `on-data` callback, then switches back to passive
25//!
26//! This matches Erlang/OTP's `{active, true/false/once}` socket options.
27//!
28//! ## TLS Support
29//!
30//! TLS can be enabled via manifest configuration:
31//!
32//! ```toml
33//! [[handler]]
34//! type = "tcp"
35//!
36//! [handler.client_tls]
37//! enabled = true
38//! # ca_cert = "/path/to/ca.pem" # Optional custom CA
39//! # skip_verify = false # For development only
40//!
41//! [handler.server_tls]
42//! enabled = true
43//! cert = "/path/to/server.pem"
44//! key = "/path/to/server-key.pem"
45//! ```
46//!
47//! When TLS is configured, connections are automatically encrypted. The actor
48//! code doesn't need to change - it uses the same `tcp-connect`, `tcp-listen`,
49//! `tcp-read`, `tcp-write` interface.
50
51mod stream;
52mod tls;
53
54use std::collections::HashMap;
55use std::future::Future;
56use std::net::SocketAddr;
57use std::pin::Pin;
58use std::sync::atomic::{AtomicU64, Ordering};
59use std::sync::Arc;
60use std::time::Instant;
61use tokio::io::{AsyncReadExt, AsyncWriteExt};
62use tokio::net::{TcpListener, TcpStream};
63use tokio::sync::Mutex;
64use tokio_util::sync::CancellationToken;
65use tracing::{debug, error, info, warn};
66
67use stream::{UnifiedReadHalf, UnifiedStream, UnifiedWriteHalf};
68use tls::TlsContext;
69
70use theater::actor::handle::ActorHandle;
71use theater::actor::store::ActorStore;
72use theater::config::actor_manifest::{HandlerConfig, TcpHandlerConfig};
73use theater::handler::{Handler, HandlerContext, SharedActorInstance};
74use theater::id::TheaterId;
75use theater::shutdown::ShutdownReceiver;
76
77use theater::pack_bridge::{
78 parse_pact, AsyncCtx, HostLinkerBuilder, InterfaceImpl, LinkerError, TypeHash, Value, ValueType,
79};
80
81// ============================================================================
82// Interface Declarations
83// ============================================================================
84
85/// Drops at scope exit and emits a `phase=... elapsed_ms=...` debug line.
86/// One line per host fn invocation, regardless of which return path was
87/// taken (incl. `?` short-circuits).
88struct PhaseLog {
89 name: &'static str,
90 start: Instant,
91}
92
93impl PhaseLog {
94 fn new(name: &'static str) -> Self {
95 Self {
96 name,
97 start: Instant::now(),
98 }
99 }
100}
101
102impl Drop for PhaseLog {
103 fn drop(&mut self) {
104 debug!(
105 phase = self.name,
106 elapsed_ms = self.start.elapsed().as_millis() as u64,
107 "tcp phase complete",
108 );
109 }
110}
111
112/// Embedded tcp.pact file content
113const TCP_PACT: &str = include_str!("../tcp.pact");
114
115/// Declare the theater:simple/tcp interface from the pact file.
116fn tcp_interface() -> InterfaceImpl {
117 let pact = parse_pact(TCP_PACT).expect("embedded tcp.pact should be valid");
118 InterfaceImpl::from_pact(&pact)
119}
120
121// ============================================================================
122// Connection State
123// ============================================================================
124
125/// State of a connection in its lifecycle
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127enum ConnectionState {
128 /// Connection accepted but not yet activated - no data operations allowed
129 Pending,
130 /// Connection is active - send/receive allowed
131 Active,
132}
133
134/// Data mode for receiving data (Erlang-style)
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136enum DataMode {
137 /// Data only received via explicit receive() calls
138 Passive,
139 /// Data pushed to on-data callback continuously
140 Active,
141 /// Receive one chunk via on-data, then switch to Passive
142 Once,
143}
144
145/// Represents the stream state based on data mode
146enum StreamState {
147 /// Full stream available for passive mode operations
148 Full(Box<UnifiedStream>),
149 /// Only write half available - read half taken by active mode task
150 WriteOnly(UnifiedWriteHalf),
151 /// Connection closed or stream taken
152 Closed,
153}
154
155/// A tracked TCP connection with ownership and state.
156///
157/// `stream` is wrapped in `Arc<Mutex<...>>` so the outer connections map
158/// mutex is only held briefly for lookup/metadata. The actual I/O acquires
159/// the per-connection lock — this lets two actors do I/O on different
160/// connections in parallel, which is essential for any flow where the
161/// runtime hosts both sides of a TCP conversation (e.g. an outbound SMTP
162/// client talking to a local SMTP server in the same theater instance).
163struct ConnectionEntry {
164 stream: Arc<Mutex<StreamState>>,
165 peer_addr: SocketAddr,
166 owner: TheaterId,
167 state: ConnectionState,
168 data_mode: DataMode,
169}
170
171/// A tracked TCP listener with ownership
172struct ListenerEntry {
173 listener: TcpListener,
174 owner: TheaterId,
175}
176
177/// Shared TCP state across all actor instances.
178///
179/// This state is shared via Arc, so all TcpHandler instances in a Theater
180/// runtime see the same connections and listeners. This enables connection
181/// transfer between actors.
182struct SharedTcpState {
183 connections: Mutex<HashMap<u64, ConnectionEntry>>,
184 listeners: Mutex<HashMap<u64, ListenerEntry>>,
185 next_id: AtomicU64,
186 max_connections: Option<u32>,
187}
188
189impl SharedTcpState {
190 fn new(max_connections: Option<u32>) -> Self {
191 Self {
192 connections: Mutex::new(HashMap::new()),
193 listeners: Mutex::new(HashMap::new()),
194 next_id: AtomicU64::new(1),
195 max_connections,
196 }
197 }
198
199 fn next_id(&self) -> u64 {
200 self.next_id.fetch_add(1, Ordering::Relaxed)
201 }
202
203 async fn check_connection_limit(&self) -> Result<(), Value> {
204 if let Some(max) = self.max_connections {
205 let count = self.connections.lock().await.len();
206 if count >= max as usize {
207 return Err(Value::String(format!(
208 "Connection limit reached ({}/{})",
209 count, max
210 )));
211 }
212 }
213 Ok(())
214 }
215}
216
217// ============================================================================
218// Handler Implementation
219// ============================================================================
220
221/// Handler for providing raw TCP networking access to WebAssembly actors.
222#[derive(Clone)]
223pub struct TcpHandler {
224 config: TcpHandlerConfig,
225 /// Shared state across all handler instances - enables connection transfer
226 shared_state: Arc<SharedTcpState>,
227 /// Actor ID for this handler instance - set during setup
228 actor_id: Arc<std::sync::Mutex<Option<TheaterId>>>,
229 /// Actor handle for calling export functions (set in setup, used by listen)
230 actor_handle: Arc<std::sync::Mutex<Option<ActorHandle>>>,
231 /// Cancellation token for spawned background tasks
232 cancellation_token: CancellationToken,
233 /// TLS context for encrypted connections (shared across clones)
234 tls_context: Arc<Option<TlsContext>>,
235}
236
237impl TcpHandler {
238 pub fn new(config: TcpHandlerConfig) -> Self {
239 // Build TLS context from config
240 let tls_context = match TlsContext::from_config(&config) {
241 Ok(ctx) => Arc::new(ctx),
242 Err(e) => {
243 error!("Failed to build TLS context: {}. TLS will be disabled.", e);
244 Arc::new(None)
245 }
246 };
247
248 Self {
249 config,
250 shared_state: Arc::new(SharedTcpState::new(None)),
251 actor_id: Arc::new(std::sync::Mutex::new(None)),
252 actor_handle: Arc::new(std::sync::Mutex::new(None)),
253 cancellation_token: CancellationToken::new(),
254 tls_context,
255 }
256 }
257
258 /// Get the interface declarations for this handler.
259 pub fn interfaces(&self) -> Vec<InterfaceImpl> {
260 vec![tcp_interface()]
261 }
262}
263
264// ── Value parsing helpers ─────────────────────────────────────────────────
265
266fn parse_string(input: &Value) -> Result<String, Value> {
267 match input {
268 Value::String(s) => Ok(s.clone()),
269 Value::Tuple(fields) if fields.len() == 1 => match &fields[0] {
270 Value::String(s) => Ok(s.clone()),
271 _ => Err(Value::String("Expected string".to_string())),
272 },
273 _ => Err(Value::String("Expected string".to_string())),
274 }
275}
276
277fn parse_two_strings(input: &Value) -> Result<(String, String), Value> {
278 match input {
279 Value::Tuple(fields) if fields.len() == 2 => {
280 let a = match &fields[0] {
281 Value::String(s) => s.clone(),
282 _ => return Err(Value::String("Expected string for first arg".to_string())),
283 };
284 let b = match &fields[1] {
285 Value::String(s) => s.clone(),
286 _ => return Err(Value::String("Expected string for second arg".to_string())),
287 };
288 Ok((a, b))
289 }
290 _ => Err(Value::String("Expected tuple (string, string)".to_string())),
291 }
292}
293
294fn parse_string_and_bytes(input: &Value) -> Result<(String, Vec<u8>), Value> {
295 match input {
296 Value::Tuple(fields) if fields.len() == 2 => {
297 let id = match &fields[0] {
298 Value::String(s) => s.clone(),
299 _ => return Err(Value::String("Expected string for id".to_string())),
300 };
301 let data = match &fields[1] {
302 Value::List { items, .. } => items
303 .iter()
304 .filter_map(|v| match v {
305 Value::U8(b) => Some(*b),
306 _ => None,
307 })
308 .collect::<Vec<u8>>(),
309 _ => return Err(Value::String("Expected list<u8> for data".to_string())),
310 };
311 Ok((id, data))
312 }
313 _ => Err(Value::String("Expected tuple (id, data)".to_string())),
314 }
315}
316
317fn parse_string_and_u32(input: &Value) -> Result<(String, u32), Value> {
318 match input {
319 Value::Tuple(fields) if fields.len() == 2 => {
320 let id = match &fields[0] {
321 Value::String(s) => s.clone(),
322 _ => return Err(Value::String("Expected string for id".to_string())),
323 };
324 let n = match &fields[1] {
325 Value::U32(n) => *n,
326 _ => return Err(Value::String("Expected u32".to_string())),
327 };
328 Ok((id, n))
329 }
330 _ => Err(Value::String("Expected tuple (id, u32)".to_string())),
331 }
332}
333
334fn id_to_string(id: u64) -> String {
335 id.to_string()
336}
337
338fn string_to_id(s: &str) -> Result<u64, Value> {
339 s.parse::<u64>()
340 .map_err(|_| Value::String(format!("Invalid id: {}", s)))
341}
342
343// ── Handler implementation ────────────────────────────────────────────────
344
345impl Handler for TcpHandler {
346 fn create_instance(&self, config: Option<&HandlerConfig>) -> Box<dyn Handler> {
347 let tcp_config = match config {
348 Some(HandlerConfig::Tcp { config }) => config.clone(),
349 _ => self.config.clone(),
350 };
351
352 // Build TLS context from config if different from current
353 let tls_context = if config.is_some() {
354 // New config provided, rebuild TLS context
355 match TlsContext::from_config(&tcp_config) {
356 Ok(ctx) => Arc::new(ctx),
357 Err(e) => {
358 error!("Failed to build TLS context: {}. TLS will be disabled.", e);
359 Arc::new(None)
360 }
361 }
362 } else {
363 // Reuse existing TLS context
364 self.tls_context.clone()
365 };
366
367 // Share the same state across all instances - this is the key for transfer!
368 // Each instance gets its own cancellation token (cancelled when that actor shuts down)
369 Box::new(TcpHandler {
370 config: tcp_config,
371 shared_state: self.shared_state.clone(), // Same Arc!
372 actor_id: Arc::new(std::sync::Mutex::new(None)),
373 actor_handle: Arc::new(std::sync::Mutex::new(None)),
374 cancellation_token: CancellationToken::new(),
375 tls_context,
376 })
377 }
378
379 fn name(&self) -> &str {
380 "tcp"
381 }
382
383 fn imports(&self) -> Option<Vec<String>> {
384 Some(
385 self.interfaces()
386 .iter()
387 .map(|i| i.name().to_string())
388 .collect(),
389 )
390 }
391
392 fn exports(&self) -> Option<Vec<String>> {
393 Some(vec!["theater:simple/tcp-client".to_string()])
394 }
395
396 fn interface_hashes(&self) -> Vec<(String, TypeHash)> {
397 self.interfaces()
398 .iter()
399 .map(|i| (i.name().to_string(), i.hash()))
400 .collect()
401 }
402
403 fn interfaces(&self) -> Vec<theater::pack_bridge::InterfaceImpl> {
404 vec![tcp_interface()]
405 }
406
407 fn setup(
408 &mut self,
409 actor_handle: ActorHandle,
410 _actor_instance: SharedActorInstance,
411 shutdown_receiver: ShutdownReceiver,
412 _event_rx: theater::handler::HandlerEventReceiver,
413 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
414 info!("TCP handler setup (passive mode)");
415
416 // Store the actor_handle for use by listen()
417 {
418 let mut handle_guard = self.actor_handle.lock().unwrap();
419 *handle_guard = Some(actor_handle);
420 }
421
422 // Get cancellation token to cancel on shutdown
423 let cancel_token = self.cancellation_token.clone();
424 let shared_state = self.shared_state.clone();
425 let actor_id_for_cleanup = self.actor_id.clone();
426
427 // Wait for shutdown, then clean up all resources OWNED BY THIS ACTOR.
428 // The connections + listeners maps are shared across every TcpHandler
429 // instance in the runtime — one map, many actors. So the cleanup must
430 // filter by owner; clearing the whole map would wipe in-flight
431 // connections owned by other actors. For TLS connections it also
432 // drives `AsyncWriteExt::shutdown` so close_notify reaches the wire
433 // before TCP FIN (the same shape as the `close` host function).
434 Box::pin(async move {
435 info!("TCP handler setup waiting for shutdown signal");
436 shutdown_receiver.wait_for_shutdown().await;
437 info!("TCP handler received shutdown, cleaning up resources");
438
439 // Cancel all spawned background tasks (listeners, active mode readers)
440 cancel_token.cancel();
441 info!("TCP handler cancellation token cancelled");
442
443 let actor_id_val = *actor_id_for_cleanup.lock().unwrap();
444
445 // Collect connection IDs owned by this actor; release the outer
446 // lock before driving per-connection shutdowns (those acquire
447 // the inner stream mutex and may await).
448 let to_remove: Vec<u64> = {
449 let connections = shared_state.connections.lock().await;
450 connections
451 .iter()
452 .filter(|(_, entry)| Some(entry.owner) == actor_id_val)
453 .map(|(id, _)| *id)
454 .collect()
455 };
456 let conn_count = to_remove.len();
457 for conn_id in to_remove {
458 shutdown_write_half_and_remove(&shared_state, conn_id).await;
459 }
460 if conn_count > 0 {
461 info!(
462 "TCP handler closed {} connections owned by actor {:?}",
463 conn_count, actor_id_val
464 );
465 }
466
467 // Same owner-filter for listeners.
468 {
469 let mut listeners = shared_state.listeners.lock().await;
470 let before = listeners.len();
471 listeners.retain(|_, entry| Some(entry.owner) != actor_id_val);
472 let removed = before - listeners.len();
473 if removed > 0 {
474 info!(
475 "TCP handler closed {} listeners owned by actor {:?}",
476 removed, actor_id_val
477 );
478 }
479 }
480
481 info!("TCP handler shutdown complete");
482 Ok(())
483 })
484 }
485
486 fn setup_host_functions_composite(
487 &mut self,
488 builder: &mut HostLinkerBuilder<'_, ActorStore>,
489 ctx: &mut HandlerContext,
490 ) -> Result<(), LinkerError> {
491 info!("Setting up TCP host functions (Pack)");
492
493 if ctx.is_satisfied("theater:simple/tcp") {
494 info!("theater:simple/tcp already satisfied, skipping");
495 return Ok(());
496 }
497
498 // Get actor ID from context
499 let actor_id = ctx
500 .actor_id
501 .expect("actor_id should be set in HandlerContext");
502
503 // Store actor_id for this instance
504 {
505 let mut id_guard = self.actor_id.lock().unwrap();
506 *id_guard = Some(actor_id);
507 }
508
509 // Update max_connections if configured
510 // Note: We can't easily update the shared state's max_connections here
511 // since it's already created. For now, first handler wins.
512
513 let state = self.shared_state.clone();
514 let actor_id_for_closures = actor_id;
515
516 // Clone handler fields for use in listen() callback
517 let actor_handle_for_listen = self.actor_handle.clone();
518 let cancel_token_for_listen = self.cancellation_token.clone();
519
520 // Clone state and actor_id for each closure
521 let st_connect = state.clone();
522 let aid_connect = actor_id_for_closures;
523 let tls_for_connect = self.tls_context.clone();
524
525 let st_listen = state.clone();
526 let aid_listen = actor_id_for_closures;
527 let tls_for_listen = self.tls_context.clone();
528
529 let st_accept = state.clone();
530 let aid_accept = actor_id_for_closures;
531 let tls_for_accept = self.tls_context.clone();
532
533 let st_activate = state.clone();
534 let aid_activate = actor_id_for_closures;
535
536 let st_set_active = state.clone();
537 let aid_set_active = actor_id_for_closures;
538 let actor_handle_for_set_active = self.actor_handle.clone();
539 let cancel_token_for_set_active = self.cancellation_token.clone();
540
541 let st_transfer = state.clone();
542 let aid_transfer = actor_id_for_closures;
543
544 let st_peer = state.clone();
545 let aid_peer = actor_id_for_closures;
546
547 let st_send = state.clone();
548 let aid_send = actor_id_for_closures;
549
550 let st_receive = state.clone();
551 let aid_receive = actor_id_for_closures;
552 let cancel_token_for_receive = self.cancellation_token.clone();
553
554 let st_close = state.clone();
555 let aid_close = actor_id_for_closures;
556
557 let st_upgrade_server = state.clone();
558 let aid_upgrade_server = actor_id_for_closures;
559 let tls_for_upgrade_server = self.tls_context.clone();
560
561 let st_upgrade_client = state.clone();
562 let aid_upgrade_client = actor_id_for_closures;
563 let tls_for_upgrade_client = self.tls_context.clone();
564
565 let st_close_listener = state.clone();
566 let aid_close_listener = actor_id_for_closures;
567
568 builder
569 .interface("theater:simple/tcp")?
570 // ----------------------------------------------------------------
571 // connect(address: string) -> result<string, string>
572 // Outbound connections are immediately active
573 // ----------------------------------------------------------------
574 .func_async_result(
575 "connect",
576 move |_ctx: AsyncCtx<ActorStore>, input: Value| {
577 let st = st_connect.clone();
578 let actor_id = aid_connect;
579 let tls_ctx = tls_for_connect.clone();
580 async move {
581 let _ph = PhaseLog::new("tcp.connect");
582 let address = parse_string(&input)?;
583 st.check_connection_limit().await?;
584 debug!("tcp connect to {}", address);
585
586 let tcp_stream = TcpStream::connect(&address)
587 .await
588 .map_err(|e| Value::String(e.to_string()))?;
589
590 let peer_addr = tcp_stream
591 .peer_addr()
592 .map_err(|e| Value::String(e.to_string()))?;
593
594 // Apply TLS if configured AND auto_handshake is enabled.
595 // STARTTLS-style protocols set client_tls.auto_handshake = false:
596 // the connector is built (so upgrade-to-tls-client works) but
597 // connect() returns a plain TCP stream until the actor explicitly
598 // upgrades it after negotiating the STARTTLS handshake.
599 let auto_handshake = tls_ctx
600 .as_ref()
601 .as_ref()
602 .map(|c| c.client_auto_handshake)
603 .unwrap_or(true);
604 let unified_stream = if !auto_handshake {
605 UnifiedStream::Plain(tcp_stream)
606 } else if let Some(ref ctx) = *tls_ctx {
607 if let Some(ref connector) = ctx.client_connector {
608 // Extract hostname from address for SNI
609 let server_name = tls::parse_server_name(
610 address.split(':').next().unwrap_or(&address),
611 )
612 .map_err(|e| Value::String(e.to_string()))?;
613
614 debug!("tcp connect: performing TLS handshake with SNI {:?}", server_name);
615 let tls_stream = connector
616 .connect(server_name, tcp_stream)
617 .await
618 .map_err(|e| Value::String(format!("TLS handshake failed: {}", e)))?;
619 info!("tcp connect: TLS handshake complete");
620 UnifiedStream::ClientTls(tls_stream)
621 } else {
622 UnifiedStream::Plain(tcp_stream)
623 }
624 } else {
625 UnifiedStream::Plain(tcp_stream)
626 };
627
628 let id = st.next_id();
629 st.connections.lock().await.insert(
630 id,
631 ConnectionEntry {
632 stream: Arc::new(Mutex::new(StreamState::Full(Box::new(
633 unified_stream,
634 )))),
635 peer_addr,
636 owner: actor_id,
637 state: ConnectionState::Active, // Outbound = active
638 data_mode: DataMode::Passive,
639 },
640 );
641 debug!("tcp connected to {} as conn={}", address, id);
642 Ok::<Value, Value>(Value::String(id_to_string(id)))
643 }
644 },
645 )?
646 // ----------------------------------------------------------------
647 // listen(address: string) -> result<string, string>
648 // Binds a listener and spawns a background accept loop
649 // ----------------------------------------------------------------
650 .func_async_result(
651 "listen",
652 move |_ctx: AsyncCtx<ActorStore>, input: Value| {
653 let st = st_listen.clone();
654 let actor_id = aid_listen;
655 let actor_handle_arc = actor_handle_for_listen.clone();
656 let cancel_token = cancel_token_for_listen.clone();
657 let tls_ctx = tls_for_listen.clone();
658
659 async move {
660 let _ph = PhaseLog::new("tcp.listen");
661 let address = parse_string(&input)?;
662 debug!("tcp listen on {}", address);
663
664 let listener = TcpListener::bind(&address)
665 .await
666 .map_err(|e| Value::String(e.to_string()))?;
667
668 let listener_id = st.next_id();
669 let has_tls = match tls_ctx.as_ref() {
670 Some(ctx) => ctx.server_acceptor.is_some(),
671 None => false,
672 };
673 info!(
674 "tcp listening on {} as listener={} (tls={})",
675 address, listener_id, has_tls
676 );
677
678 // Take the actor_handle for use in the accept loop
679 let actor_handle = {
680 let guard = actor_handle_arc.lock().unwrap();
681 guard.clone()
682 };
683 let Some(actor_handle) = actor_handle else {
684 return Err(Value::String(
685 "Actor handle not available - setup() not called?".to_string(),
686 ));
687 };
688
689 // Clone state for the background task
690 let st_for_task = st.clone();
691 let actor_id_for_task = actor_id;
692
693 // Spawn background accept loop with cancellation support
694 tokio::spawn(async move {
695 info!("TCP accept loop started for listener={}", listener_id);
696
697 loop {
698 tokio::select! {
699 _ = cancel_token.cancelled() => {
700 info!("TCP accept loop cancelled for listener={}", listener_id);
701 break;
702 }
703 result = listener.accept() => {
704 match result {
705 Ok((tcp_stream, peer_addr)) => {
706 let conn_id = st_for_task.next_id();
707 info!(
708 "tcp accepted conn={} from {} on listener={}",
709 conn_id, peer_addr, listener_id
710 );
711
712 // Apply TLS if configured
713 let unified_stream = if let Some(ref ctx) = *tls_ctx {
714 if let Some(ref acceptor) = ctx.server_acceptor {
715 debug!("tcp accept: performing TLS handshake for conn={}", conn_id);
716 match acceptor.accept(tcp_stream).await {
717 Ok(tls_stream) => {
718 info!("tcp accept: TLS handshake complete for conn={}", conn_id);
719 UnifiedStream::ServerTls(tls_stream)
720 }
721 Err(e) => {
722 error!("TLS handshake failed for conn={}: {}", conn_id, e);
723 continue; // Skip this connection
724 }
725 }
726 } else {
727 UnifiedStream::Plain(tcp_stream)
728 }
729 } else {
730 UnifiedStream::Plain(tcp_stream)
731 };
732
733 // Store connection in PENDING state
734 st_for_task.connections.lock().await.insert(
735 conn_id,
736 ConnectionEntry {
737 stream: Arc::new(Mutex::new(
738 StreamState::Full(Box::new(unified_stream)),
739 )),
740 peer_addr,
741 owner: actor_id_for_task,
742 state: ConnectionState::Pending,
743 data_mode: DataMode::Passive,
744 },
745 );
746
747 // Detach so a slow/blocked handle-connection in the actor
748 // cannot wedge the accept loop and saturate the kernel SYN queue.
749 let conn_id_str = id_to_string(conn_id);
750 let params =
751 Value::Tuple(vec![Value::String(conn_id_str)]);
752 let actor_handle_for_call = actor_handle.clone();
753 let st_for_cleanup = st_for_task.clone();
754 tokio::spawn(async move {
755 if let Err(e) = actor_handle_for_call
756 .call_function(
757 "theater:simple/tcp-client.handle-connection"
758 .to_string(),
759 params,
760 )
761 .await
762 {
763 error!(
764 "Failed to call handle-connection for conn={}: {}",
765 conn_id, e
766 );
767 // Clean up the pending connection
768 st_for_cleanup
769 .connections
770 .lock()
771 .await
772 .remove(&conn_id);
773 }
774 });
775 }
776 Err(e) => {
777 error!(
778 "TCP accept error on listener={}: {}",
779 listener_id, e
780 );
781 }
782 }
783 }
784 }
785 }
786
787 info!("TCP accept loop stopped for listener={}", listener_id);
788 });
789
790 Ok::<Value, Value>(Value::String(id_to_string(listener_id)))
791 }
792 },
793 )?
794 // ----------------------------------------------------------------
795 // accept(listener-id: string) -> result<string, string>
796 // Manual accept - returns connection in PENDING state
797 // ----------------------------------------------------------------
798 .func_async_result(
799 "accept",
800 move |_ctx: AsyncCtx<ActorStore>, input: Value| {
801 let st = st_accept.clone();
802 let actor_id = aid_accept;
803 let tls_ctx = tls_for_accept.clone();
804 async move {
805 let _ph = PhaseLog::new("tcp.accept");
806 let listener_id_str = parse_string(&input)?;
807 let listener_id = string_to_id(&listener_id_str)?;
808 debug!("tcp accept on listener={}", listener_id);
809
810 // Check ownership
811 let mut listeners = st.listeners.lock().await;
812 let entry = listeners.get_mut(&listener_id).ok_or_else(|| {
813 Value::String(format!("Listener not found: {}", listener_id_str))
814 })?;
815
816 if entry.owner != actor_id {
817 return Err(Value::String(format!(
818 "Listener {} not owned by this actor",
819 listener_id_str
820 )));
821 }
822
823 let (tcp_stream, peer_addr) = entry
824 .listener
825 .accept()
826 .await
827 .map_err(|e| Value::String(e.to_string()))?;
828
829 let conn_id = st.next_id();
830 drop(listeners); // Release lock before acquiring connections lock
831
832 // Apply TLS if configured
833 let unified_stream = if let Some(ref ctx) = *tls_ctx {
834 if let Some(ref acceptor) = ctx.server_acceptor {
835 debug!("tcp manual accept: performing TLS handshake for conn={}", conn_id);
836 let tls_stream = acceptor
837 .accept(tcp_stream)
838 .await
839 .map_err(|e| Value::String(format!("TLS handshake failed: {}", e)))?;
840 info!("tcp manual accept: TLS handshake complete for conn={}", conn_id);
841 UnifiedStream::ServerTls(tls_stream)
842 } else {
843 UnifiedStream::Plain(tcp_stream)
844 }
845 } else {
846 UnifiedStream::Plain(tcp_stream)
847 };
848
849 st.connections.lock().await.insert(
850 conn_id,
851 ConnectionEntry {
852 stream: Arc::new(Mutex::new(StreamState::Full(Box::new(
853 unified_stream,
854 )))),
855 peer_addr,
856 owner: actor_id,
857 state: ConnectionState::Pending, // Starts pending!
858 data_mode: DataMode::Passive,
859 },
860 );
861 debug!("tcp accepted conn={} from {} (pending)", conn_id, peer_addr);
862 Ok::<Value, Value>(Value::String(id_to_string(conn_id)))
863 }
864 },
865 )?
866 // ----------------------------------------------------------------
867 // activate(connection-id: string) -> result<_, string>
868 // Activate a pending connection for this actor
869 // ----------------------------------------------------------------
870 .func_async_result(
871 "activate",
872 move |_ctx: AsyncCtx<ActorStore>, input: Value| {
873 let st = st_activate.clone();
874 let actor_id = aid_activate;
875 async move {
876 let _ph = PhaseLog::new("tcp.activate");
877 let conn_id_str = parse_string(&input)?;
878 let conn_id = string_to_id(&conn_id_str)?;
879
880 let mut connections = st.connections.lock().await;
881 let entry = connections.get_mut(&conn_id).ok_or_else(|| {
882 Value::String(format!("Connection not found: {}", conn_id_str))
883 })?;
884
885 if entry.owner != actor_id {
886 return Err(Value::String(format!(
887 "Connection {} not owned by this actor",
888 conn_id_str
889 )));
890 }
891
892 if entry.state == ConnectionState::Active {
893 return Err(Value::String(format!(
894 "Connection {} is already active",
895 conn_id_str
896 )));
897 }
898
899 entry.state = ConnectionState::Active;
900 debug!("tcp activated conn={}", conn_id);
901 Ok::<Value, Value>(Value::Tuple(vec![]))
902 }
903 },
904 )?
905 // ----------------------------------------------------------------
906 // set-active(connection-id: string, mode: string) -> result<_, string>
907 // Set data mode: "passive", "active", or "once"
908 // ----------------------------------------------------------------
909 .func_async_result(
910 "set-active",
911 move |_ctx: AsyncCtx<ActorStore>, input: Value| {
912 let st = st_set_active.clone();
913 let actor_id = aid_set_active;
914 let actor_handle_arc = actor_handle_for_set_active.clone();
915 let cancel_token = cancel_token_for_set_active.clone();
916 async move {
917 let _ph = PhaseLog::new("tcp.set_active");
918 let (conn_id_str, mode_str) = parse_two_strings(&input)?;
919 let conn_id = string_to_id(&conn_id_str)?;
920
921 let new_mode = match mode_str.as_str() {
922 "passive" => DataMode::Passive,
923 "active" => DataMode::Active,
924 "once" => DataMode::Once,
925 _ => {
926 return Err(Value::String(format!(
927 "Invalid mode '{}': expected 'passive', 'active', or 'once'",
928 mode_str
929 )));
930 }
931 };
932
933 let mut connections = st.connections.lock().await;
934 let entry = connections.get_mut(&conn_id).ok_or_else(|| {
935 Value::String(format!("Connection not found: {}", conn_id_str))
936 })?;
937
938 if entry.owner != actor_id {
939 return Err(Value::String(format!(
940 "Connection {} not owned by this actor",
941 conn_id_str
942 )));
943 }
944
945 if entry.state != ConnectionState::Active {
946 return Err(Value::String(format!(
947 "Connection {} must be activated before setting data mode",
948 conn_id_str
949 )));
950 }
951
952 let old_mode = entry.data_mode;
953
954 // Handle mode transitions
955 match (old_mode, new_mode) {
956 (DataMode::Passive, DataMode::Active | DataMode::Once) => {
957 // Transitioning to active/once mode - split stream and spawn reader
958 let mut stream_guard = entry.stream.lock().await;
959 let stream = std::mem::replace(&mut *stream_guard, StreamState::Closed);
960 let full_stream = match stream {
961 StreamState::Full(s) => s,
962 other @ StreamState::WriteOnly(_) => {
963 // Restore — we replaced with Closed above.
964 *stream_guard = other;
965 return Err(Value::String(format!(
966 "Connection {} is already in active mode",
967 conn_id_str
968 )));
969 }
970 StreamState::Closed => {
971 return Err(Value::String(format!(
972 "Connection {} is closed",
973 conn_id_str
974 )));
975 }
976 };
977
978 let (read_half, write_half) = full_stream.into_split();
979 *stream_guard = StreamState::WriteOnly(write_half);
980 drop(stream_guard);
981 entry.data_mode = new_mode;
982
983 // Get actor handle for callbacks
984 let actor_handle = {
985 let guard = actor_handle_arc.lock().unwrap();
986 guard.clone()
987 };
988 let Some(actor_handle) = actor_handle else {
989 return Err(Value::String(
990 "Actor handle not available".to_string(),
991 ));
992 };
993
994 // Spawn background read task with cancellation support
995 let conn_id_for_task = conn_id;
996 let st_for_task = st.clone();
997 let is_once = new_mode == DataMode::Once;
998 let cancel_token_for_task = cancel_token.clone();
999
1000 tokio::spawn(async move {
1001 tcp_read_loop(
1002 conn_id_for_task,
1003 read_half,
1004 actor_handle,
1005 st_for_task,
1006 is_once,
1007 cancel_token_for_task,
1008 )
1009 .await;
1010 });
1011
1012 info!(
1013 "tcp conn={} set to {} mode, read loop spawned",
1014 conn_id, mode_str
1015 );
1016 }
1017 (DataMode::Active | DataMode::Once, DataMode::Passive) => {
1018 // Can't go back to passive once in active mode (stream is split)
1019 return Err(Value::String(format!(
1020 "Cannot switch connection {} back to passive mode (stream is split)",
1021 conn_id_str
1022 )));
1023 }
1024 (DataMode::Active, DataMode::Once) | (DataMode::Once, DataMode::Active) => {
1025 // Can't switch between active and once (would need to stop/restart reader)
1026 return Err(Value::String(format!(
1027 "Cannot switch connection {} between active and once modes",
1028 conn_id_str
1029 )));
1030 }
1031 _ => {
1032 // Same mode, no-op
1033 debug!("tcp conn={} already in {} mode", conn_id, mode_str);
1034 }
1035 }
1036
1037 Ok::<Value, Value>(Value::Tuple(vec![]))
1038 }
1039 },
1040 )?
1041 // ----------------------------------------------------------------
1042 // transfer(connection-id: string, target-actor: string) -> result<_, string>
1043 // Transfer connection to another actor (and activate it)
1044 // ----------------------------------------------------------------
1045 .func_async_result(
1046 "transfer",
1047 move |ctx: AsyncCtx<ActorStore>, input: Value| {
1048 let st = st_transfer.clone();
1049 let actor_id = aid_transfer;
1050 async move {
1051 let _ph = PhaseLog::new("tcp.transfer");
1052 let (conn_id_str, target_actor_str) = parse_two_strings(&input)?;
1053 let conn_id = string_to_id(&conn_id_str)?;
1054
1055 let target_actor: TheaterId = target_actor_str
1056 .parse()
1057 .map_err(|e| Value::String(format!("Invalid actor ID: {}", e)))?;
1058
1059 {
1060 let mut connections = st.connections.lock().await;
1061 let entry = connections.get_mut(&conn_id).ok_or_else(|| {
1062 Value::String(format!("Connection not found: {}", conn_id_str))
1063 })?;
1064
1065 if entry.owner != actor_id {
1066 return Err(Value::String(format!(
1067 "Connection {} not owned by this actor",
1068 conn_id_str
1069 )));
1070 }
1071
1072 // Transfer ownership and activate
1073 let old_owner = entry.owner;
1074 entry.owner = target_actor;
1075 entry.state = ConnectionState::Active;
1076
1077 info!(
1078 "tcp transferred conn={} from {} to {} (now active)",
1079 conn_id, old_owner, target_actor
1080 );
1081 }
1082
1083 // Get target actor's handle and call handle-connection-transfer
1084 let store = ctx.data();
1085 let theater_tx = store.theater_tx.clone();
1086
1087 let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
1088 let get_handle_cmd = theater::messages::TheaterCommand::GetActorHandle {
1089 actor_id: target_actor,
1090 response_tx: handle_tx,
1091 };
1092 theater_tx.send(get_handle_cmd).await
1093 .map_err(|e| Value::String(format!("Failed to get target handle: {}", e)))?;
1094
1095 let target_handle = match handle_rx.await {
1096 Ok(Some(handle)) => handle,
1097 Ok(None) => return Err(Value::String("Target actor handle not found".to_string())),
1098 Err(e) => return Err(Value::String(format!("Failed to receive handle: {}", e))),
1099 };
1100
1101 // Call handle-connection-transfer on target
1102 // Just pass conn_id - runtime will prepend state to make (state, conn_id)
1103 let params = Value::String(conn_id_str.clone());
1104 if let Err(e) = target_handle
1105 .call_function(
1106 "theater:simple/tcp-client.handle-connection-transfer".to_string(),
1107 params,
1108 )
1109 .await
1110 {
1111 warn!("Failed to call handle-connection-transfer: {:?}", e);
1112 // Don't fail the transfer, just log the warning
1113 }
1114
1115 Ok::<Value, Value>(Value::Tuple(vec![]))
1116 }
1117 },
1118 )?
1119 // ----------------------------------------------------------------
1120 // peer-address(connection-id: string) -> result<string, string>
1121 // Get peer address (works in pending or active state)
1122 // ----------------------------------------------------------------
1123 .func_async_result(
1124 "peer-address",
1125 move |_ctx: AsyncCtx<ActorStore>, input: Value| {
1126 let st = st_peer.clone();
1127 let actor_id = aid_peer;
1128 async move {
1129 let _ph = PhaseLog::new("tcp.peer_address");
1130 let conn_id_str = parse_string(&input)?;
1131 let conn_id = string_to_id(&conn_id_str)?;
1132
1133 let connections = st.connections.lock().await;
1134 let entry = connections.get(&conn_id).ok_or_else(|| {
1135 Value::String(format!("Connection not found: {}", conn_id_str))
1136 })?;
1137
1138 if entry.owner != actor_id {
1139 return Err(Value::String(format!(
1140 "Connection {} not owned by this actor",
1141 conn_id_str
1142 )));
1143 }
1144
1145 Ok::<Value, Value>(Value::String(entry.peer_addr.to_string()))
1146 }
1147 },
1148 )?
1149 // ----------------------------------------------------------------
1150 // send(connection-id: string, data: list<u8>) -> result<u64, string>
1151 // ----------------------------------------------------------------
1152 .func_async_result(
1153 "send",
1154 move |_ctx: AsyncCtx<ActorStore>, input: Value| {
1155 let st = st_send.clone();
1156 let actor_id = aid_send;
1157 async move {
1158 let _ph = PhaseLog::new("tcp.send");
1159 let (conn_id_str, data) = parse_string_and_bytes(&input)?;
1160 let conn_id = string_to_id(&conn_id_str)?;
1161 let len = data.len();
1162
1163 // Lock the outer map only long enough to validate metadata
1164 // and clone the per-connection stream Arc. The actual I/O
1165 // runs without holding the outer lock — that lets two
1166 // actors do I/O on different connections in parallel.
1167 let stream_arc = {
1168 let connections = st.connections.lock().await;
1169 let entry = connections.get(&conn_id).ok_or_else(|| {
1170 Value::String(format!("Connection not found: {}", conn_id_str))
1171 })?;
1172
1173 if entry.owner != actor_id {
1174 return Err(Value::String(format!(
1175 "Connection {} not owned by this actor",
1176 conn_id_str
1177 )));
1178 }
1179
1180 if entry.state == ConnectionState::Pending {
1181 return Err(Value::String(format!(
1182 "Connection {} is pending - call activate() or transfer() first",
1183 conn_id_str
1184 )));
1185 }
1186
1187 entry.stream.clone()
1188 };
1189
1190 let mut stream_guard = stream_arc.lock().await;
1191 match &mut *stream_guard {
1192 StreamState::Full(stream) => {
1193 stream
1194 .write_all(&data)
1195 .await
1196 .map_err(|e| Value::String(e.to_string()))?;
1197 }
1198 StreamState::WriteOnly(write_half) => {
1199 write_half
1200 .write_all(&data)
1201 .await
1202 .map_err(|e| Value::String(e.to_string()))?;
1203 }
1204 StreamState::Closed => {
1205 return Err(Value::String(format!(
1206 "Connection {} is closed",
1207 conn_id_str
1208 )));
1209 }
1210 }
1211
1212 debug!("tcp send conn={} {} bytes", conn_id, len);
1213 Ok::<Value, Value>(Value::U64(len as u64))
1214 }
1215 },
1216 )?
1217 // ----------------------------------------------------------------
1218 // receive(connection-id: string, max-bytes: u32) -> result<list<u8>, string>
1219 // ----------------------------------------------------------------
1220 .func_async_result(
1221 "receive",
1222 move |_ctx: AsyncCtx<ActorStore>, input: Value| {
1223 let st = st_receive.clone();
1224 let actor_id = aid_receive;
1225 let cancel_token = cancel_token_for_receive.clone();
1226 async move {
1227 let _ph = PhaseLog::new("tcp.receive");
1228 let (conn_id_str, max_bytes) = parse_string_and_u32(&input)?;
1229 let conn_id = string_to_id(&conn_id_str)?;
1230
1231 // Lock the outer map only long enough to validate metadata
1232 // and clone the per-connection stream Arc. The actual read
1233 // runs without holding the outer lock so other actors can
1234 // do their own I/O concurrently on other connections.
1235 let stream_arc = {
1236 let connections = st.connections.lock().await;
1237 let entry = connections.get(&conn_id).ok_or_else(|| {
1238 Value::String(format!("Connection not found: {}", conn_id_str))
1239 })?;
1240
1241 if entry.owner != actor_id {
1242 return Err(Value::String(format!(
1243 "Connection {} not owned by this actor",
1244 conn_id_str
1245 )));
1246 }
1247
1248 if entry.state == ConnectionState::Pending {
1249 return Err(Value::String(format!(
1250 "Connection {} is pending - call activate() or transfer() first",
1251 conn_id_str
1252 )));
1253 }
1254
1255 if entry.data_mode != DataMode::Passive {
1256 return Err(Value::String(format!(
1257 "Connection {} is in active mode - data is pushed via on-data callback",
1258 conn_id_str
1259 )));
1260 }
1261
1262 entry.stream.clone()
1263 };
1264
1265 let mut stream_guard = stream_arc.lock().await;
1266 let stream = match &mut *stream_guard {
1267 StreamState::Full(stream) => stream,
1268 StreamState::WriteOnly(_) => {
1269 return Err(Value::String(format!(
1270 "Connection {} read half not available (in active mode)",
1271 conn_id_str
1272 )));
1273 }
1274 StreamState::Closed => {
1275 return Err(Value::String(format!(
1276 "Connection {} is closed",
1277 conn_id_str
1278 )));
1279 }
1280 };
1281
1282 let mut buf = vec![0u8; max_bytes as usize];
1283
1284 // Use select to make the read interruptible on shutdown
1285 let n = tokio::select! {
1286 result = stream.read(&mut buf) => {
1287 result.map_err(|e| Value::String(e.to_string()))?
1288 }
1289 _ = cancel_token.cancelled() => {
1290 info!("TCP receive cancelled due to shutdown, conn={}", conn_id);
1291 return Err(Value::String("Connection closed: actor shutting down".to_string()));
1292 }
1293 };
1294
1295 debug!(
1296 "tcp receive conn={} {} bytes (max={})",
1297 conn_id, n, max_bytes
1298 );
1299 buf.truncate(n);
1300 Ok::<Value, Value>(Value::List {
1301 elem_type: ValueType::U8,
1302 items: buf.into_iter().map(Value::U8).collect(),
1303 })
1304 }
1305 },
1306 )?
1307 // ----------------------------------------------------------------
1308 // close(connection-id: string) -> result<_, string>
1309 //
1310 // Gracefully shuts the write side of the stream before dropping —
1311 // for TLS streams this sends the close_notify alert so strict
1312 // clients (e.g. rustls) don't see an "unexpected EOF". Plain TCP
1313 // streams get a normal FIN.
1314 // ----------------------------------------------------------------
1315 .func_async_result(
1316 "close",
1317 move |_ctx: AsyncCtx<ActorStore>, input: Value| {
1318 let st = st_close.clone();
1319 let actor_id = aid_close;
1320 async move {
1321 let _ph = PhaseLog::new("tcp.close");
1322 let conn_id_str = parse_string(&input)?;
1323 let conn_id = string_to_id(&conn_id_str)?;
1324
1325 // Take the stream out of the map first, holding the
1326 // outer lock only long enough to validate ownership
1327 // and remove the entry.
1328 let stream_arc = {
1329 let mut connections = st.connections.lock().await;
1330 let entry = connections.get(&conn_id).ok_or_else(|| {
1331 Value::String(format!("Connection not found: {}", conn_id_str))
1332 })?;
1333 if entry.owner != actor_id {
1334 return Err(Value::String(format!(
1335 "Connection {} not owned by this actor",
1336 conn_id_str
1337 )));
1338 }
1339 let arc = entry.stream.clone();
1340 connections.remove(&conn_id);
1341 arc
1342 };
1343
1344 // Move the stream out and call shutdown on the write
1345 // side. Errors are non-fatal — peer may already have
1346 // closed — but we log them at WARN so production can
1347 // tell apart a clean close_notify-sent path from a
1348 // shutdown failure that drops bytes (ticket #10).
1349 let mut guard = stream_arc.lock().await;
1350 let taken = std::mem::replace(&mut *guard, StreamState::Closed);
1351 drop(guard);
1352 let shutdown_result = match taken {
1353 StreamState::Full(mut s) => {
1354 Some(AsyncWriteExt::shutdown(&mut *s).await)
1355 }
1356 StreamState::WriteOnly(mut w) => {
1357 Some(AsyncWriteExt::shutdown(&mut w).await)
1358 }
1359 StreamState::Closed => None,
1360 };
1361 match shutdown_result {
1362 Some(Ok(())) => {
1363 debug!("tcp close conn={} (graceful shutdown ok)", conn_id);
1364 }
1365 Some(Err(e)) => {
1366 warn!(
1367 "tcp close conn={} shutdown error (close_notify may not have been sent): {}",
1368 conn_id, e
1369 );
1370 }
1371 None => {
1372 debug!("tcp close conn={} (already closed)", conn_id);
1373 }
1374 }
1375 Ok::<Value, Value>(Value::Tuple(vec![]))
1376 }
1377 },
1378 )?
1379 // ----------------------------------------------------------------
1380 // upgrade-to-tls-server(connection-id: string) -> result<_, string>
1381 //
1382 // For STARTTLS-style protocols: the actor accepts a plain TCP
1383 // connection, exchanges a few protocol lines, then calls this to
1384 // wrap the existing stream with TLS using the server_tls cert
1385 // configured on this handler. After this returns Ok, the same
1386 // connection-id transports TLS-encrypted bytes.
1387 // ----------------------------------------------------------------
1388 .func_async_result(
1389 "upgrade-to-tls-server",
1390 move |_ctx: AsyncCtx<ActorStore>, input: Value| {
1391 let st = st_upgrade_server.clone();
1392 let actor_id = aid_upgrade_server;
1393 let tls_ctx = tls_for_upgrade_server.clone();
1394 async move {
1395 let _ph = PhaseLog::new("tcp.upgrade_to_tls_server");
1396 let conn_id_str = parse_string(&input)?;
1397 let conn_id = string_to_id(&conn_id_str)?;
1398
1399 let acceptor = match tls_ctx.as_ref() {
1400 Some(ctx) => match &ctx.server_acceptor {
1401 Some(a) => a.clone(),
1402 None => {
1403 return Err(Value::String(
1404 "server_tls not configured on this handler".into(),
1405 ))
1406 }
1407 },
1408 None => {
1409 return Err(Value::String(
1410 "server_tls not configured on this handler".into(),
1411 ))
1412 }
1413 };
1414
1415 let stream_arc = {
1416 let connections = st.connections.lock().await;
1417 let entry = connections.get(&conn_id).ok_or_else(|| {
1418 Value::String(format!("Connection not found: {}", conn_id_str))
1419 })?;
1420 if entry.owner != actor_id {
1421 return Err(Value::String(format!(
1422 "Connection {} not owned by this actor",
1423 conn_id_str
1424 )));
1425 }
1426 if entry.state != ConnectionState::Active {
1427 return Err(Value::String(format!(
1428 "Connection {} must be activated before TLS upgrade",
1429 conn_id_str
1430 )));
1431 }
1432 if entry.data_mode != DataMode::Passive {
1433 return Err(Value::String(format!(
1434 "Connection {} must be in passive mode for TLS upgrade",
1435 conn_id_str
1436 )));
1437 }
1438 entry.stream.clone()
1439 };
1440
1441 let mut guard = stream_arc.lock().await;
1442 let taken = std::mem::replace(&mut *guard, StreamState::Closed);
1443 let inner = match taken {
1444 StreamState::Full(boxed) => *boxed,
1445 StreamState::WriteOnly(w) => {
1446 *guard = StreamState::WriteOnly(w);
1447 return Err(Value::String(format!(
1448 "Connection {} is split; TLS upgrade not supported",
1449 conn_id_str
1450 )));
1451 }
1452 StreamState::Closed => {
1453 return Err(Value::String(format!(
1454 "Connection {} is closed",
1455 conn_id_str
1456 )));
1457 }
1458 };
1459 let tcp = match inner {
1460 UnifiedStream::Plain(tcp) => tcp,
1461 other => {
1462 *guard = StreamState::Full(Box::new(other));
1463 return Err(Value::String(format!(
1464 "Connection {} is already TLS",
1465 conn_id_str
1466 )));
1467 }
1468 };
1469
1470 let tls_stream = match acceptor.accept(tcp).await {
1471 Ok(s) => s,
1472 Err(e) => {
1473 // Stream is gone — leave entry as Closed.
1474 return Err(Value::String(format!(
1475 "TLS server handshake failed: {}",
1476 e
1477 )));
1478 }
1479 };
1480 *guard = StreamState::Full(Box::new(UnifiedStream::ServerTls(tls_stream)));
1481 drop(guard);
1482
1483 debug!("tcp upgrade-to-tls-server conn={}", conn_id);
1484 Ok::<Value, Value>(Value::Tuple(vec![]))
1485 }
1486 },
1487 )?
1488 // ----------------------------------------------------------------
1489 // upgrade-to-tls-client(connection-id, server-name) -> result<_, string>
1490 //
1491 // The client-side mirror of upgrade-to-tls-server: wraps an
1492 // existing plain TCP connection with TLS using the client_tls
1493 // config. server-name is used for SNI and cert verification.
1494 // ----------------------------------------------------------------
1495 .func_async_result(
1496 "upgrade-to-tls-client",
1497 move |_ctx: AsyncCtx<ActorStore>, input: Value| {
1498 let st = st_upgrade_client.clone();
1499 let actor_id = aid_upgrade_client;
1500 let tls_ctx = tls_for_upgrade_client.clone();
1501 async move {
1502 let _ph = PhaseLog::new("tcp.upgrade_to_tls_client");
1503 let (conn_id_str, server_name_str) = parse_two_strings(&input)?;
1504 let conn_id = string_to_id(&conn_id_str)?;
1505
1506 let connector = match tls_ctx.as_ref() {
1507 Some(ctx) => match &ctx.client_connector {
1508 Some(c) => c.clone(),
1509 None => {
1510 return Err(Value::String(
1511 "client_tls not configured on this handler".into(),
1512 ))
1513 }
1514 },
1515 None => {
1516 return Err(Value::String(
1517 "client_tls not configured on this handler".into(),
1518 ))
1519 }
1520 };
1521
1522 let server_name =
1523 rustls::pki_types::ServerName::try_from(server_name_str.clone())
1524 .map_err(|e| {
1525 Value::String(format!(
1526 "Invalid server name {:?}: {}",
1527 server_name_str, e
1528 ))
1529 })?;
1530
1531 let stream_arc = {
1532 let connections = st.connections.lock().await;
1533 let entry = connections.get(&conn_id).ok_or_else(|| {
1534 Value::String(format!("Connection not found: {}", conn_id_str))
1535 })?;
1536 if entry.owner != actor_id {
1537 return Err(Value::String(format!(
1538 "Connection {} not owned by this actor",
1539 conn_id_str
1540 )));
1541 }
1542 if entry.state != ConnectionState::Active {
1543 return Err(Value::String(format!(
1544 "Connection {} must be activated before TLS upgrade",
1545 conn_id_str
1546 )));
1547 }
1548 if entry.data_mode != DataMode::Passive {
1549 return Err(Value::String(format!(
1550 "Connection {} must be in passive mode for TLS upgrade",
1551 conn_id_str
1552 )));
1553 }
1554 entry.stream.clone()
1555 };
1556
1557 let mut guard = stream_arc.lock().await;
1558 let taken = std::mem::replace(&mut *guard, StreamState::Closed);
1559 let inner = match taken {
1560 StreamState::Full(boxed) => *boxed,
1561 StreamState::WriteOnly(w) => {
1562 *guard = StreamState::WriteOnly(w);
1563 return Err(Value::String(format!(
1564 "Connection {} is split; TLS upgrade not supported",
1565 conn_id_str
1566 )));
1567 }
1568 StreamState::Closed => {
1569 return Err(Value::String(format!(
1570 "Connection {} is closed",
1571 conn_id_str
1572 )));
1573 }
1574 };
1575 let tcp = match inner {
1576 UnifiedStream::Plain(tcp) => tcp,
1577 other => {
1578 *guard = StreamState::Full(Box::new(other));
1579 return Err(Value::String(format!(
1580 "Connection {} is already TLS",
1581 conn_id_str
1582 )));
1583 }
1584 };
1585
1586 let tls_stream = match connector.connect(server_name, tcp).await {
1587 Ok(s) => s,
1588 Err(e) => {
1589 return Err(Value::String(format!(
1590 "TLS client handshake failed: {}",
1591 e
1592 )));
1593 }
1594 };
1595 *guard = StreamState::Full(Box::new(UnifiedStream::ClientTls(tls_stream)));
1596 drop(guard);
1597
1598 debug!(
1599 "tcp upgrade-to-tls-client conn={} server_name={}",
1600 conn_id, server_name_str
1601 );
1602 Ok::<Value, Value>(Value::Tuple(vec![]))
1603 }
1604 },
1605 )?
1606 // ----------------------------------------------------------------
1607 // close-listener(listener-id: string) -> result<_, string>
1608 // ----------------------------------------------------------------
1609 .func_async_result(
1610 "close-listener",
1611 move |_ctx: AsyncCtx<ActorStore>, input: Value| {
1612 let st = st_close_listener.clone();
1613 let actor_id = aid_close_listener;
1614 async move {
1615 let _ph = PhaseLog::new("tcp.close_listener");
1616 let listener_id_str = parse_string(&input)?;
1617 let listener_id = string_to_id(&listener_id_str)?;
1618
1619 let mut listeners = st.listeners.lock().await;
1620 let entry = listeners.get(&listener_id).ok_or_else(|| {
1621 Value::String(format!("Listener not found: {}", listener_id_str))
1622 })?;
1623
1624 if entry.owner != actor_id {
1625 return Err(Value::String(format!(
1626 "Listener {} not owned by this actor",
1627 listener_id_str
1628 )));
1629 }
1630
1631 listeners.remove(&listener_id);
1632 debug!("tcp close listener={}", listener_id);
1633 Ok::<Value, Value>(Value::Tuple(vec![]))
1634 }
1635 },
1636 )?;
1637
1638 ctx.mark_satisfied("theater:simple/tcp");
1639 info!("TCP host functions (Pack) set up successfully");
1640 Ok(())
1641 }
1642
1643 fn supports_composite(&self) -> bool {
1644 true
1645 }
1646}
1647
1648// ============================================================================
1649// Active Mode Read Loop
1650// ============================================================================
1651
1652/// Buffer size for active mode reads
1653const ACTIVE_READ_BUFFER_SIZE: usize = 8192;
1654
1655/// Remove the connection entry from the shared map and, for TLS streams in
1656/// the `WriteOnly(write_half)` state, flush the close_notify alert before the
1657/// underlying TCP gets FIN'd.
1658///
1659/// Used by `tcp_read_loop` on every cleanup branch (peer-EOF, read error,
1660/// cancellation). Without the shutdown call, dropping the WriteOnly write
1661/// half drops the rustls session without flushing its outgoing close_notify
1662/// alert — peers then observe a bare TCP FIN and surface it as
1663/// `UnexpectedEof` ("peer closed connection without sending TLS
1664/// close_notify"). The explicit `close` host function had this right
1665/// already (PR #45); this is the parallel fix for the auto-cleanup path.
1666async fn shutdown_write_half_and_remove(shared_state: &Arc<SharedTcpState>, conn_id: u64) {
1667 let stream_arc = {
1668 let mut connections = shared_state.connections.lock().await;
1669 connections.remove(&conn_id).map(|e| e.stream)
1670 };
1671 let Some(stream_arc) = stream_arc else {
1672 return;
1673 };
1674 let mut guard = stream_arc.lock().await;
1675 let taken = std::mem::replace(&mut *guard, StreamState::Closed);
1676 drop(guard);
1677 match taken {
1678 StreamState::WriteOnly(mut w) => {
1679 if let Err(e) = AsyncWriteExt::shutdown(&mut w).await {
1680 warn!(
1681 "tcp conn={} cleanup shutdown error (close_notify may not have been sent): {}",
1682 conn_id, e
1683 );
1684 }
1685 }
1686 StreamState::Full(mut s) => {
1687 // Active-mode normally leaves the entry in WriteOnly state, but
1688 // handle Full defensively in case the reader exits before set-active
1689 // could complete the split.
1690 if let Err(e) = AsyncWriteExt::shutdown(&mut *s).await {
1691 warn!(
1692 "tcp conn={} cleanup shutdown error (close_notify may not have been sent): {}",
1693 conn_id, e
1694 );
1695 }
1696 }
1697 StreamState::Closed => {}
1698 }
1699}
1700
1701/// Background task that reads from a connection and calls on-data/on-close callbacks.
1702///
1703/// This is spawned when a connection enters "active" or "once" mode.
1704async fn tcp_read_loop(
1705 conn_id: u64,
1706 mut read_half: UnifiedReadHalf,
1707 actor_handle: ActorHandle,
1708 shared_state: Arc<SharedTcpState>,
1709 is_once: bool,
1710 cancel_token: CancellationToken,
1711) {
1712 let conn_id_str = id_to_string(conn_id);
1713 info!(
1714 "tcp read loop started for conn={} (once={})",
1715 conn_id, is_once
1716 );
1717
1718 let mut buf = vec![0u8; ACTIVE_READ_BUFFER_SIZE];
1719
1720 loop {
1721 tokio::select! {
1722 _ = cancel_token.cancelled() => {
1723 info!("tcp read loop cancelled for conn={}", conn_id);
1724 shutdown_write_half_and_remove(&shared_state, conn_id).await;
1725 break;
1726 }
1727 result = read_half.read(&mut buf) => {
1728 match result {
1729 Ok(0) => {
1730 // EOF - connection closed by peer
1731 info!("tcp conn={} received EOF", conn_id);
1732
1733 // Call on-close callback
1734 let params = Value::Tuple(vec![
1735 Value::String(conn_id_str.clone()),
1736 Value::String("eof".to_string()),
1737 ]);
1738
1739 if let Err(e) = actor_handle
1740 .call_function("theater:simple/tcp-client.on-close".to_string(), params)
1741 .await
1742 {
1743 warn!("tcp conn={} on-close callback failed: {}", conn_id, e);
1744 }
1745
1746 shutdown_write_half_and_remove(&shared_state, conn_id).await;
1747 break;
1748 }
1749 Ok(n) => {
1750 // Data received - call on-data callback
1751 let data = buf[..n].to_vec();
1752 debug!("tcp conn={} received {} bytes, calling on-data", conn_id, n);
1753
1754 let params = Value::Tuple(vec![
1755 Value::String(conn_id_str.clone()),
1756 Value::List {
1757 elem_type: ValueType::U8,
1758 items: data.into_iter().map(Value::U8).collect(),
1759 },
1760 ]);
1761
1762 if let Err(e) = actor_handle
1763 .call_function("theater:simple/tcp-client.on-data".to_string(), params)
1764 .await
1765 {
1766 error!("tcp conn={} on-data callback failed: {}", conn_id, e);
1767 // Continue reading even if callback fails
1768 }
1769
1770 if is_once {
1771 // Once mode: switch back to passive after one read
1772 info!("tcp conn={} once mode complete, switching to passive", conn_id);
1773
1774 // Update the connection's data mode
1775 if let Some(entry) = shared_state.connections.lock().await.get_mut(&conn_id) {
1776 entry.data_mode = DataMode::Passive;
1777 }
1778 break;
1779 }
1780 }
1781 Err(e) => {
1782 // Read error - connection broken
1783 error!("tcp conn={} read error: {}", conn_id, e);
1784
1785 // Call on-close callback with error
1786 let params = Value::Tuple(vec![
1787 Value::String(conn_id_str.clone()),
1788 Value::String(e.to_string()),
1789 ]);
1790
1791 if let Err(e) = actor_handle
1792 .call_function("theater:simple/tcp-client.on-close".to_string(), params)
1793 .await
1794 {
1795 warn!("tcp conn={} on-close callback failed: {}", conn_id, e);
1796 }
1797
1798 shutdown_write_half_and_remove(&shared_state, conn_id).await;
1799 break;
1800 }
1801 }
1802 }
1803 }
1804 }
1805
1806 info!("tcp read loop stopped for conn={}", conn_id);
1807}
1808
1809#[cfg(test)]
1810mod tests {
1811 use super::*;
1812
1813 #[test]
1814 fn test_tcp_handler_creation() {
1815 let config = TcpHandlerConfig::default();
1816 let handler = TcpHandler::new(config);
1817
1818 assert_eq!(handler.name(), "tcp");
1819 assert_eq!(
1820 handler.imports(),
1821 Some(vec!["theater:simple/tcp".to_string()])
1822 );
1823 assert_eq!(
1824 handler.exports(),
1825 Some(vec!["theater:simple/tcp-client".to_string()])
1826 );
1827 }
1828
1829 #[test]
1830 fn test_tcp_handler_clone_shares_state() {
1831 let config = TcpHandlerConfig::default();
1832 let handler = TcpHandler::new(config);
1833 let cloned = handler.create_instance(None);
1834
1835 // Both should have the same name
1836 assert_eq!(cloned.name(), "tcp");
1837
1838 // The key test: shared_state Arc should be the same
1839 // (We can't easily test this without exposing internals, but the
1840 // implementation clones the Arc, not the data)
1841 }
1842
1843 #[test]
1844 fn test_tcp_interface_hash_determinism() {
1845 let interface1 = tcp_interface();
1846 let interface2 = tcp_interface();
1847 assert_eq!(interface1.hash(), interface2.hash());
1848 }
1849
1850 #[test]
1851 fn test_tcp_handler_interface_hashes() {
1852 let config = TcpHandlerConfig::default();
1853 let handler = TcpHandler::new(config);
1854
1855 let hashes = handler.interface_hashes();
1856 assert_eq!(hashes.len(), 1);
1857 assert_eq!(hashes[0].0, "theater:simple/tcp");
1858
1859 // Hash should be non-zero
1860 assert!(!hashes[0].1.as_bytes().iter().all(|&b| b == 0));
1861 }
1862
1863 #[test]
1864 fn test_connection_state_enum() {
1865 assert_ne!(ConnectionState::Pending, ConnectionState::Active);
1866 }
1867
1868 #[test]
1869 fn test_data_mode_enum() {
1870 assert_ne!(DataMode::Passive, DataMode::Active);
1871 assert_ne!(DataMode::Passive, DataMode::Once);
1872 assert_ne!(DataMode::Active, DataMode::Once);
1873 }
1874}