roplat 0.2.0

roplat: just a robot operation system
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
//! 跨平台 loopback TCP 后端
//!
//! MVP 阶段的默认后端。帧格式:`u32 BE 长度前缀 + payload`。
//!
//! 结构:
//! - 发布者:`bind(127.0.0.1:0)` → 异步 `accept` 循环 → 每个订阅者独立 outbox 队列 + 独立 writer 线程,
//!   `publish` 时把字节克隆入每个 peer 队列,实现真·一写多读广播
//! - 订阅者:`connect(addr)` → 完成 `Hello/HelloAck` → reader + writer 线程对
//!
//! 线程模型:后端自带发送与接收线程,语义层通过 `publish` / `try_recv` 非阻塞调用。

use super::super::endpoint::{EndpointUri, Role};
use super::super::handshake::{Hello, HelloAck};
use super::super::rendezvous::{RendezvousDescriptor, RendezvousDir};
use super::super::ring_buffer::{OverflowPolicy, TcpOptions};
use super::super::transport::{IpcError, IpcResult, IpcTransport};

use std::collections::VecDeque;
use std::io::{BufRead, BufReader, Read, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::Duration;

fn tx_idle_wait_duration() -> Option<Duration> {
    static TX_IDLE_WAIT: std::sync::OnceLock<Option<Duration>> = std::sync::OnceLock::new();
    *TX_IDLE_WAIT.get_or_init(|| {
        let us = std::env::var("ROPLAT_IPC_TX_IDLE_US")
            .ok()
            .and_then(|v| v.parse::<u64>().ok())
            .unwrap_or(1_000);
        if us == 0 {
            None
        } else {
            Some(Duration::from_micros(us))
        }
    })
}

type ByteQueue = Arc<Mutex<VecDeque<Vec<u8>>>>;

/// 发送侧策略
enum SendSide {
    /// 发布者:每个订阅者一条独立 outbox 队列(广播)
    Publisher(Arc<Mutex<Vec<ByteQueue>>>),
    /// 订阅者:单条 outbox 队列
    Subscriber(ByteQueue),
}

/// TCP 后端
pub struct TcpTransport {
    role: Role,
    #[allow(dead_code)]
    uri: EndpointUri,
    send_side: SendSide,
    /// 接收侧:后台线程把收到的字节塞进来,try_recv 从这里取
    inbox: ByteQueue,
    connected_peers: Arc<AtomicUsize>,
    shutdown: Arc<AtomicBool>,
    threads: Mutex<Vec<JoinHandle<()>>>,
    /// 发布者特有:rendezvous 目录与 URI,用于 Drop 时清理
    rendezvous: Option<(RendezvousDir, EndpointUri)>,
    /// 发布者特有:TCP 选项(背压水位)
    tcp_opts: TcpOptions,
}

impl TcpTransport {
    /// 发布者端:绑定本地端口,写 rendezvous 文件,返回 transport
    pub fn bind_publisher(uri: &EndpointUri, rdv: RendezvousDir) -> IpcResult<Self> {
        Self::bind_publisher_with_opts(uri, rdv, TcpOptions::default())
    }

    /// 发布者端(带背压选项)
    pub fn bind_publisher_with_opts(
        uri: &EndpointUri,
        rdv: RendezvousDir,
        tcp_opts: TcpOptions,
    ) -> IpcResult<Self> {
        let listener = TcpListener::bind(("127.0.0.1", 0))?;
        let local_addr: SocketAddr = listener.local_addr()?;
        listener.set_nonblocking(true)?;

        let desc = RendezvousDescriptor::new(uri, "tcp", local_addr.to_string());
        rdv.publish(&desc)?;

        let peer_queues: Arc<Mutex<Vec<ByteQueue>>> = Arc::new(Mutex::new(Vec::new()));
        let inbox: ByteQueue = Arc::new(Mutex::new(VecDeque::new()));
        let connected_peers = Arc::new(AtomicUsize::new(0));
        let shutdown = Arc::new(AtomicBool::new(false));

        let accept_thread = {
            let peer_queues = peer_queues.clone();
            let inbox = inbox.clone();
            let connected_peers = connected_peers.clone();
            let shutdown = shutdown.clone();
            let uri = uri.clone();
            thread::Builder::new()
                .name("roplat-ipc-tcp-accept".into())
                .spawn(move || {
                    accept_loop(listener, uri, peer_queues, inbox, connected_peers, shutdown);
                })
                .map_err(IpcError::Io)?
        };

        Ok(Self {
            role: Role::Publisher,
            uri: uri.clone(),
            send_side: SendSide::Publisher(peer_queues),
            inbox,
            connected_peers,
            shutdown,
            threads: Mutex::new(vec![accept_thread]),
            rendezvous: Some((rdv, uri.clone())),
            tcp_opts,
        })
    }

    /// 订阅者端:通过 rendezvous 找到发布者、完成握手,返回 transport
    ///
    /// 若 rendezvous 文件尚未存在,返回 `IpcError::NotReady`,上层可重试。
    pub fn connect_subscriber(uri: &EndpointUri, rdv: &RendezvousDir) -> IpcResult<Self> {
        let desc = rdv.lookup(uri)?;
        if desc.transport != "tcp" {
            return Err(IpcError::Protocol(format!(
                "backend mismatch: rendezvous says {}, client wants tcp",
                desc.transport
            )));
        }
        let addr: SocketAddr = desc
            .address
            .parse()
            .map_err(|e: std::net::AddrParseError| IpcError::Protocol(e.to_string()))?;

        let mut stream = TcpStream::connect_timeout(&addr, Duration::from_secs(2))?;
        stream.set_nodelay(true)?;

        let hello = Hello::new(
            Role::Subscriber,
            uri.schema_id.clone(),
            uri.msg_version,
            format!("{}/{}", uri.namespace, uri.name),
        );
        send_json_line(&mut stream, &hello)?;
        let ack: HelloAck = recv_json_line(&mut stream)?;
        if !ack.accepted {
            return Err(IpcError::Protocol(
                ack.reason.unwrap_or_else(|| "hello rejected".to_string()),
            ));
        }

        let outbox: ByteQueue = Arc::new(Mutex::new(VecDeque::new()));
        let inbox: ByteQueue = Arc::new(Mutex::new(VecDeque::new()));
        let connected_peers = Arc::new(AtomicUsize::new(1));
        let shutdown = Arc::new(AtomicBool::new(false));

        stream.set_read_timeout(Some(Duration::from_millis(50)))?;
        let write_stream = stream.try_clone()?;

        let read_thread = spawn_reader(
            stream,
            inbox.clone(),
            connected_peers.clone(),
            shutdown.clone(),
            "roplat-ipc-tcp-sub-rx",
        )?;
        let write_thread = spawn_writer_sub(
            write_stream,
            outbox.clone(),
            connected_peers.clone(),
            shutdown.clone(),
        )?;

        Ok(Self {
            role: Role::Subscriber,
            uri: uri.clone(),
            send_side: SendSide::Subscriber(outbox),
            inbox,
            connected_peers,
            shutdown,
            threads: Mutex::new(vec![read_thread, write_thread]),
            rendezvous: None,
            tcp_opts: TcpOptions::default(),
        })
    }
}

impl IpcTransport for TcpTransport {
    fn kind(&self) -> &'static str {
        "tcp"
    }

    fn publish(&self, bytes: &[u8]) -> IpcResult<()> {
        match &self.send_side {
            SendSide::Publisher(queues) => {
                let guard = queues.lock().expect("peer_queues poisoned");
                for q in guard.iter() {
                    let mut oq = q.lock().expect("peer outbox poisoned");
                    if let Some(hw) = self.tcp_opts.high_watermark
                        && oq.len() >= hw
                    {
                        match self.tcp_opts.overflow {
                            OverflowPolicy::DropOldest => {
                                while oq.len() >= hw {
                                    oq.pop_front();
                                }
                            }
                            OverflowPolicy::DropNewest => continue,
                            OverflowPolicy::Error => {
                                return Err(IpcError::Protocol(format!(
                                    "tcp outbox full (high_watermark={hw})"
                                )));
                            }
                        }
                    }
                    oq.push_back(bytes.to_vec());
                }
                Ok(())
            }
            SendSide::Subscriber(outbox) => {
                outbox
                    .lock()
                    .expect("outbox poisoned")
                    .push_back(bytes.to_vec());
                Ok(())
            }
        }
    }

    fn try_recv(&self) -> IpcResult<Option<Vec<u8>>> {
        Ok(self.inbox.lock().expect("inbox poisoned").pop_front())
    }

    fn is_ready(&self) -> bool {
        match self.role {
            Role::Publisher => self.connected_peers.load(Ordering::Acquire) > 0,
            Role::Subscriber => !self.shutdown.load(Ordering::Acquire),
        }
    }
}

impl Drop for TcpTransport {
    fn drop(&mut self) {
        self.shutdown.store(true, Ordering::Release);
        if let Some((rdv, uri)) = &self.rendezvous {
            let _ = rdv.withdraw(uri);
        }
        if let Ok(mut threads) = self.threads.lock() {
            while let Some(h) = threads.pop() {
                let _ = h.join();
            }
        }
    }
}

// ============================================================
// Accept loop(发布者端)
// ============================================================

fn accept_loop(
    listener: TcpListener,
    uri: EndpointUri,
    peer_queues: Arc<Mutex<Vec<ByteQueue>>>,
    inbox: ByteQueue,
    connected_peers: Arc<AtomicUsize>,
    shutdown: Arc<AtomicBool>,
) {
    while !shutdown.load(Ordering::Acquire) {
        match listener.accept() {
            Ok((mut stream, _addr)) => {
                if let Err(e) = handle_peer_handshake(&mut stream, &uri) {
                    let _ = send_json_line(&mut stream, &HelloAck::reject(e.to_string()));
                    let _ = stream.shutdown(Shutdown::Both);
                    continue;
                }
                let _ = send_json_line(&mut stream, &HelloAck::ok());
                let _ = stream.set_read_timeout(Some(Duration::from_millis(50)));
                let _ = stream.set_nodelay(true);

                let write_stream = match stream.try_clone() {
                    Ok(s) => s,
                    Err(_) => continue,
                };

                let peer_outbox: ByteQueue = Arc::new(Mutex::new(VecDeque::new()));
                peer_queues
                    .lock()
                    .expect("peer_queues poisoned")
                    .push(peer_outbox.clone());
                connected_peers.fetch_add(1, Ordering::AcqRel);

                let writer_queues = peer_queues.clone();
                let writer_outbox = peer_outbox.clone();
                let shutdown_w = shutdown.clone();
                let connected_peers_w = connected_peers.clone();
                if thread::Builder::new()
                    .name("roplat-ipc-tcp-pub-tx".into())
                    .spawn(move || {
                        writer_loop_pub(write_stream, writer_outbox.clone(), shutdown_w);
                        let mut guard = writer_queues.lock().expect("peer_queues poisoned");
                        guard.retain(|q| !Arc::ptr_eq(q, &writer_outbox));
                        connected_peers_w.fetch_sub(1, Ordering::AcqRel);
                    })
                    .is_err()
                {
                    let mut guard = peer_queues.lock().expect("peer_queues poisoned");
                    guard.retain(|q| !Arc::ptr_eq(q, &peer_outbox));
                    connected_peers.fetch_sub(1, Ordering::AcqRel);
                    continue;
                }

                let _ = spawn_reader(
                    stream,
                    inbox.clone(),
                    // 发布者 reader 不动 connected_peers(由 writer 退出时维护,避免双减)
                    Arc::new(AtomicUsize::new(0)),
                    shutdown.clone(),
                    "roplat-ipc-tcp-pub-rx",
                );
            }
            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                thread::sleep(Duration::from_millis(20));
            }
            Err(_) => {
                thread::sleep(Duration::from_millis(50));
            }
        }
    }
}

fn handle_peer_handshake(stream: &mut TcpStream, uri: &EndpointUri) -> IpcResult<()> {
    stream.set_read_timeout(Some(Duration::from_secs(2)))?;
    let hello: Hello = recv_json_line(stream)?;
    if hello.magic != Hello::MAGIC {
        return Err(IpcError::Protocol(format!("bad magic: {}", hello.magic)));
    }
    if hello.protocol_version != Hello::PROTOCOL_VERSION {
        return Err(IpcError::Protocol(format!(
            "protocol version mismatch: expected {}, got {}",
            Hello::PROTOCOL_VERSION,
            hello.protocol_version
        )));
    }
    if hello.role != Role::Subscriber {
        return Err(IpcError::RoleMismatch {
            expected: Role::Subscriber.as_str().to_string(),
            actual: hello.role.as_str().to_string(),
        });
    }
    if hello.schema_id != uri.schema_id {
        return Err(IpcError::SchemaMismatch {
            expected: uri.schema_id.to_string(),
            actual: hello.schema_id.to_string(),
        });
    }
    if hello.msg_version != uri.msg_version {
        return Err(IpcError::Protocol(format!(
            "msg_version mismatch: expected {}, got {}",
            uri.msg_version, hello.msg_version
        )));
    }
    Ok(())
}

// ============================================================
// Reader / Writer 线程
// ============================================================

fn spawn_reader(
    mut stream: TcpStream,
    inbox: ByteQueue,
    connected_peers: Arc<AtomicUsize>,
    shutdown: Arc<AtomicBool>,
    name: &str,
) -> IpcResult<JoinHandle<()>> {
    thread::Builder::new()
        .name(name.to_string())
        .spawn(move || {
            while !shutdown.load(Ordering::Acquire) {
                match read_frame(&mut stream) {
                    Ok(Some(buf)) => {
                        inbox.lock().expect("inbox poisoned").push_back(buf);
                    }
                    Ok(None) => {}
                    Err(_) => {
                        connected_peers.fetch_sub(1, Ordering::AcqRel);
                        break;
                    }
                }
            }
        })
        .map_err(IpcError::Io)
}

fn spawn_writer_sub(
    mut stream: TcpStream,
    outbox: ByteQueue,
    connected_peers: Arc<AtomicUsize>,
    shutdown: Arc<AtomicBool>,
) -> IpcResult<JoinHandle<()>> {
    thread::Builder::new()
        .name("roplat-ipc-tcp-sub-tx".into())
        .spawn(move || {
            while !shutdown.load(Ordering::Acquire) {
                let frame = { outbox.lock().expect("outbox poisoned").pop_front() };
                match frame {
                    Some(buf) => {
                        if write_frame(&mut stream, &buf).is_err() {
                            connected_peers.fetch_sub(1, Ordering::AcqRel);
                            break;
                        }
                    }
                    None => {
                        if let Some(d) = tx_idle_wait_duration() {
                            thread::sleep(d);
                        } else {
                            std::hint::spin_loop();
                        }
                    }
                }
            }
        })
        .map_err(IpcError::Io)
}

fn writer_loop_pub(mut stream: TcpStream, outbox: ByteQueue, shutdown: Arc<AtomicBool>) {
    while !shutdown.load(Ordering::Acquire) {
        let frame = { outbox.lock().expect("outbox poisoned").pop_front() };
        match frame {
            Some(buf) => {
                if write_frame(&mut stream, &buf).is_err() {
                    break;
                }
            }
            None => {
                if let Some(d) = tx_idle_wait_duration() {
                    thread::sleep(d);
                } else {
                    std::hint::spin_loop();
                }
            }
        }
    }
}

// ============================================================
// 帧编解码 / JSON 行协议
// ============================================================

fn write_frame(stream: &mut TcpStream, bytes: &[u8]) -> std::io::Result<()> {
    let len = bytes.len() as u32;
    stream.write_all(&len.to_be_bytes())?;
    stream.write_all(bytes)?;
    Ok(())
}

fn read_frame(stream: &mut TcpStream) -> IpcResult<Option<Vec<u8>>> {
    let mut len_buf = [0u8; 4];
    match stream.read_exact(&mut len_buf) {
        Ok(()) => {}
        Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => return Ok(None),
        Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => return Ok(None),
        Err(ref e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
            return Err(IpcError::PeerGone);
        }
        Err(e) => return Err(e.into()),
    }
    let len = u32::from_be_bytes(len_buf) as usize;
    if len > 64 * 1024 * 1024 {
        return Err(IpcError::Protocol(format!("frame too large: {len}")));
    }
    let mut buf = vec![0u8; len];
    stream.read_exact(&mut buf)?;
    Ok(Some(buf))
}

fn send_json_line<T: serde::Serialize>(stream: &mut TcpStream, v: &T) -> IpcResult<()> {
    let mut s = serde_json::to_string(v).map_err(|e| IpcError::Serde(e.to_string()))?;
    s.push('\n');
    stream.write_all(s.as_bytes())?;
    Ok(())
}

fn recv_json_line<T: for<'de> serde::Deserialize<'de>>(stream: &mut TcpStream) -> IpcResult<T> {
    let mut reader = BufReader::new(stream);
    let mut line = String::new();
    reader.read_line(&mut line)?;
    if line.is_empty() {
        return Err(IpcError::PeerGone);
    }
    serde_json::from_str(line.trim_end()).map_err(|e| IpcError::Serde(e.to_string()))
}