rs_ctrl_os 0.4.2

A small runtime library for node discovery, ZeroMQ pub/sub messaging, dynamic TOML config reloading, and simple time synchronization in distributed Rust systems.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
# rs_ctrl_os

`rs_ctrl_os` 是一个用于构建分布式节点控制系统的小型运行时库,提供:

- **节点发现**:基于 UDP 多播的心跳机制(`Heartbeat` + `ServiceRegistry`- **消息通信**:基于 ZeroMQ 的 pub/sub 抽象(`PubSubManager`- **配置管理**:TOML 配置加载 + 动态热更新(`ConfigManager` / `load_config_typed`- **时间同步**:简单的主从时钟同步(`TimeSynchronizer`- **统一错误/日志**`RsCtrlError` + `tracing` 日志初始化

适合需要在局域网内跑多进程/多节点,进行“互相发现 + 消息分发 + 动态配置”的系统。

---

## 框架能力与边界

### 框架负责什么

| 能力 | 说明 |
|------|------|
| **StaticBase 与 [static_config]** | 节点 ID、host、port、是否 master、publishers/subscribers 拓扑、static_nodes(IP fallback)、publish_hz/subscribe_hz、dynamic_load_enable 等**运行所需的基础配置** |
| **配置加载机制** | 从 TOML 解析 `[static_config]`,提供 `load_config_rcos``load_config_typed``ConfigManager` 等 API |
| **dynamic 热更新** |`dynamic_load_enable=true` 时,监听配置文件变化并热重载 `[dynamic]` 内容 |
| **消息通道** | ZMQ pub/sub、发现、时间同步、频率限速、原始字节透传(`publish_raw` / `try_recv_raw`|

### 框架不负责什么

| 边界 | 说明 |
|------|------|
| **[dynamic] 的结构与语义** | 框架只负责「加载并热更新」`[dynamic]`**不定义**其字段。每个应用自行定义 `D: Deserialize`,例如 CAN 接口列表、电机参数、相机参数等 |
| **业务数据内容** | 图像、点云等大体量数据应通过 topic 传输,**不应**塞进 TOML。配置中只放「如何连接、参数、schema 版本」等元信息 |
| **业务协议与编码** | 消息 payload 的序列化方式(bincode / raw / JPEG 等)由应用选择;框架提供 `publish_topic`(bincode)、`publish_raw`(透传)两种能力 |

### 如何区分框架配置与业务配置

- **`[static_config]`**:框架强依赖,**必须存在**。包含 `my_id``host``port``is_master``publish_hz``subscribe_hz``dynamic_load_enable``publishers``subscribers``static_nodes`(可选)等。
- **`[dynamic]`**:业务自由定义。框架不解析其具体字段,只负责按你提供的 `D` 反序列化并(可选)热更新。  
  例如:can_bridge 定义 `interfaces``devices`;相机节点定义 `camera_id``resolution`;点云节点定义 `voxel_size` 等。

### 框架在背后完成的工作

以下能力由框架自动完成,应用通常无需关心实现细节:

| 模块 | 后台行为 |
|------|----------|
| **节点发现** | 启动两个线程:发送端每 1 秒向 `224.0.0.100:9999` 广播本节点 `Heartbeat`;接收端持续收取其它节点心跳,更新 `ServiceRegistry`,超过 10 秒未收到则从注册表剔除。 |
| **时间同步** | 接收端收到 `is_master=true` 的心跳时,提取其 `clock_time_ms`,计算本地与 master 的时钟偏移并低通滤波。`now_corrected_ms()` 内部使用该偏移修正当前时间。 |
| **ConfigManager 热更新** |`dynamic_load_enable=true` 时,通过 `notify` 监听配置文件。文件变化时自动重读并解析 `[dynamic]`,更新内部 `RwLock``get_dynamic_clone()` 返回最新值。 |
| **发布频率控制** |`publish_hz > 0` 时,`publish_topic` / `publish_raw` 内部按 `topic_key` 记录上次发送时间,超过最小间隔的请求会被静默丢弃(限频)。 |
| **订阅频率控制** |`subscribe_hz > 0` 时,`try_recv_raw` / `try_recv_specific` 内部按 `local_name` 记录上次轮询时间,未到间隔则直接返回 `None`,避免过度轮询。 |
| **订阅连接建立** | 初始化时,若 discovery 和 `static_nodes` 均未提供目标地址,订阅进入 `pending_subs``try_recv_raw` 内部自动 tick(),优先从 `ServiceRegistry` 解析,其次从 `static_nodes``node_id -> "host:port"`)fallback,**无需手动调用 tick()**|
| **子话题过滤** | 若通过 `set_sub_topics` 设置了白名单,框架在 `try_recv_raw` 中只返回白名单内的 `sub_topic`,其它消息静默丢弃。 |

---

## 使用教程(一步一步)

### 0. 准备环境

- 安装 ZeroMQ 库(不同平台命令略有差异,以下是常见示例):
  - Debian/Ubuntu: `sudo apt-get install libzmq3-dev`
  - Fedora: `sudo dnf install zeromq zeromq-devel`
  - macOS(Homebrew): `brew install zeromq`
- 安装 Rust 稳定版(建议用 `rustup`)。

克隆本项目:

```bash
git clone https://github.com/LycanW/rs_ctrl_os.git
cd rs_ctrl_os
```

或者使用 cargo 添加依赖:

```bash
cargo add rs_ctrl_os
```

### 1. 跑一个最简单的 pub/sub 示例

准备一个最小的配置(你也可以用仓库里的 `example_config.toml`):

```toml
[static_config]
my_id = "node1"
host = "127.0.0.1"
port = 5555
is_master = true
publish_hz = 1000
subscribe_hz = 1000
dynamic_load_enable = true

[static_config.subscribers]
local_sub = "node1"

[static_config.publishers]
control = "self"

# 可选:当 discovery 未找到目标时,用此地址直连(适合无多播环境)
[static_config.static_nodes]
# node1 = "127.0.0.1:5555"

[dynamic]
message_prefix = "hello"
interval_ms = 200
```

在项目根目录运行:

```bash
cargo run --example pub_node -- example_config.toml
```

`pub_node` 会持续往 `control` topic 发布消息。**注意**:当前 `pub_node` 示例仅发布、不订阅,因此不会打印收到的消息。若需同时收发,可参考 `examples/` 自行扩展,或运行两个进程(见下文)。

如果你改动配置文件里的 `[dynamic]`(比如改前缀、改间隔)并保存,进程会自动加载新的动态配置:

- `message_prefix` 会改变打印出来的文本前缀;
- `interval_ms` 会改变发送/接收的频率。

### 2. 跑两个进程:一个 pub,一个 sub

有时候你想在两个不同的进程里测试 pub/sub。可以用仓库里的:

- `examples/pub_node.rs`
- `examples/sub_node.rs`

以及对应的 `pub_config.toml`、`sub_config.toml`。

**配置说明**:`sub_config.toml` 中 `local_sub` 指向的 `target_node_id` 需与 `pub_node` 的 `my_id` 一致,才能收到 pub 的消息。仓库中的 `sub_config.toml` 可能指向其他节点(如 `gateway_node_01`),用于不同场景。若要 pub/sub 互通,可将 `[static_config.subscribers]` 改为 `local_sub = "pub_node"`,并配置 `static_nodes` 或依赖 discovery 解析地址。

先打开一个终端作为发布端:

```bash
cargo run --example pub_node -- pub_config.toml
```

再打开另一个终端作为订阅端:

```bash
cargo run --example sub_node -- sub_config.toml
```

此时:

- `pub_node` 会按照 `pub_config.toml``[dynamic]` 配置,持续往 ZeroMQ PUB socket 上发消息。
- `sub_node` 通过 discovery 或 `static_nodes` 连接 `pub_node`,从 `local_sub` 收消息并打印。

你可以动态修改 `pub_config.toml` 的 `[dynamic]`,比如:

```toml
[dynamic]
message_prefix = "pub1"
interval_ms = 200
```

改成:

```toml
[dynamic]
message_prefix = "PUB-UPDATED"
interval_ms = 1000
```

保存之后,几百毫秒到一两秒内你会看到:

- `sub_node` 打印出的消息前缀从 `pub1` 变成 `PUB-UPDATED`- 输出频率从 200ms 一条变成大约 1 秒一条。

---

## API 参考

### 1. 初始化

#### `init_logging()`

初始化 `tracing` 日志,默认 INFO 级别。应在 `main` 入口处调用一次。

```rust
use rs_ctrl_os::init_logging;
init_logging();
```

---

### 2. 配置管理

#### `load_config_rcos(path) -> Result<(StaticBase, toml::Value)>`

从 TOML 加载配置,返回框架静态配置 + 原始 `[dynamic]`(`toml::Value`)。  
适用于需要手动反序列化 `[dynamic]` 或只需 `static_config` 的场景。  
- **path**:配置文件路径(`impl AsRef<Path>`  
- **返回**`(StaticBase, toml::Value)``[dynamic]` 缺失时返回空表

#### `load_config_typed::<D>(path) -> Result<(StaticBase, D)>`

一次性加载配置,返回强类型 `(StaticBase, D)`。无文件监听,无热更新开销。  
- **D**:需实现 `Deserialize`,对应 `[dynamic]` 结构  
- **适用**:不需要热重载的节点

#### `ConfigManager<D>`

带热重载的配置管理器。当 `dynamic_load_enable=true` 时,通过 `notify` 监听文件变化并自动重载 `[dynamic]`。

| 方法 | 签名 | 说明 |
|------|------|------|
| `new` | `new(config_path: &Path) -> Result<Self>` | 加载配置并(可选)启动文件监听 |
| `static_cfg` | `static_cfg(&self) -> &StaticBase` | 获取静态配置引用 |
| `get_dynamic_clone` | `get_dynamic_clone(&self) -> D` | 获取当前 `[dynamic]` 的克隆(热更新后为最新值) |
| `config_path` | `config_path(&self) -> &Path` | 配置文件路径 |

**D 约束**:`Clone + Deserialize + Send + Sync + 'static`

#### `StaticBase`

框架静态配置结构体,从 TOML `[static_config]` 解析。

| 字段 | 类型 | 默认 | 说明 |
|------|------|------|------|
| `my_id` | `String` | - | 本节点唯一 ID |
| `host` | `String` | - | 本节点监听地址(如 `127.0.0.1`|
| `port` | `u16` | - | 本节点监听端口 |
| `is_master` | `bool` | `false` | 是否作为时间同步 master |
| `publishers` | `HashMap<String, String>` | `{}` | `topic_key -> "node_id"``"self"`(本地绑定) |
| `subscribers` | `HashMap<String, String>` | `{}` | `local_name -> target_node_id` |
| `static_nodes` | `HashMap<String, String>` | `{}` | `node_id -> "host:port"`,discovery 失败时的 fallback |
| `publish_hz` | `i64` | - | 发布频率上限:`>0` 限频,`0` 不限,`<0` 禁止发布 |
| `subscribe_hz` | `i64` | - | 订阅轮询频率:`>0` 限频,`0` 不限,`<0` 禁止订阅 |
| `dynamic_load_enable` | `bool` | `true` | 是否启用 `[dynamic]` 热更新 |

---

### 3. 节点发现

#### `start_discovery(...) -> Result<ServiceRegistry>`

```rust
pub fn start_discovery(
    my_id: &str,
    my_host: &str,
    my_port: u16,
    is_master: bool,
    time_sync: Option<Arc<TimeSynchronizer>>,
) -> Result<ServiceRegistry>
```

启动 UDP 多播发现(`224.0.0.100:9999`)。后台线程每 1 秒广播心跳,接收其他节点心跳并更新注册表。  
- **time_sync**:传入 `TimeSynchronizer` 时,会从 master 心跳中提取 `clock_time_ms` 进行时间同步  
- **返回**:共享的 `ServiceRegistry`,供 `PubSubManager` 解析订阅目标地址

#### `ServiceRegistry`

节点注册表,内部维护 `node_id -> (host, port, timestamp)`。

| 方法 | 签名 | 说明 |
|------|------|------|
| `new` | `new() -> Self` | 创建空注册表 |
| `register` | `register(&self, hb: &Heartbeat)` | 注册/更新节点(框架内部使用) |
| `get_address` | `get_address(&self, node_id: &str) -> Option<(String, u16)>` | 根据 node_id 获取 `(host, port)` |
| `cleanup` | `cleanup(&self, timeout_secs: u64)` | 剔除超时未心跳的节点(框架内部每轮调用) |

#### `Heartbeat`

心跳消息结构(JSON 序列化,用于发现协议)。通过 `rs_ctrl_os::discovery::Heartbeat` 访问(未在 crate 根重导出)。

```rust
pub struct Heartbeat {
    pub node_id: String,
    pub host: String,
    pub port: u16,
    pub timestamp: u64,
    pub clock_time_ms: u64,
    pub is_master: bool,
}
```

---

### 4. ZeroMQ Pub/Sub(PubSubManager)

#### 创建

```rust
pub fn new(static_cfg: &StaticBase, registry: ServiceRegistry) -> Result<Self>
```

根据 `static_config` 的 `publishers`/`subscribers` 绑定 PUB、连接 SUB。`target = "self"` 的 topic 共用本机 PUB socket。

#### 频率与过滤

| 方法 | 签名 | 说明 |
|------|------|------|
| `set_publish_hz` | `set_publish_hz(&mut self, hz: i64)` | **覆盖**发布频率。`new()` 已从 `static_config` 注入,通常无需调用;仅在运行时需修改时使用 |
| `set_subscribe_hz` | `set_subscribe_hz(&mut self, hz: i64)` | **覆盖**订阅轮询频率。同上,一般依赖配置即可 |
| `set_sub_topics` | `set_sub_topics(&mut self, local_name: &str, topics: &[S]) -> Result<()>` |`local_name` 设置 sub_topic 白名单,仅返回列表内的消息;空列表表示不过滤 |
| `tick` | `tick(&mut self) -> Result<()>` | 尝试为 `pending_subs` 建立连接;`try_recv_raw` 内部会自动调用,一般无需手动调用 |

#### 发布

| 方法 | 签名 | 说明 |
|------|------|------|
| `publish_topic` | `publish_topic<T: Serialize>(&mut self, topic_key: &str, sub_topic: &str, data: &T) -> Result<()>` | Bincode 序列化后发送,适合结构化小消息 |
| `publish_raw` | `publish_raw(&mut self, topic_key: &str, sub_topic: &str, payload: &[u8]) -> Result<()>` | 透传原始字节,适合图像、点云等已编码数据 |

**消息格式**:ZMQ 三帧 multipart `[节点ID, sub_topic, payload]`

**频率控制**:当 `publish_hz > 0` 时,按 `topic_key` 限频,超频请求静默丢弃。

#### 接收

| 方法 | 签名 | 说明 |
|------|------|------|
| `try_recv_raw` | `try_recv_raw(&mut self, local_name: &str) -> Result<Option<(String, Vec<u8>)>>` | 非阻塞接收,返回 `(sub_topic, payload)`;内部自动 `tick()` 并做频率限制 |
| `try_recv_specific` | `try_recv_specific<T: Deserialize>(&mut self, local_name: &str, target_sub: &str) -> Result<Option<T>>` | 仅当 `sub_topic == target_sub` 时用 bincode 反序列化为 `T`,否则返回 `None` |

**频率控制**:当 `subscribe_hz > 0` 时,按 `local_name` 限频,未到间隔返回 `None`。

---

### 5. 时间同步(TimeSynchronizer)

主从时钟同步,从 `is_master=true` 的心跳中提取 `clock_time_ms` 计算偏移。

| 方法 | 签名 | 说明 |
|------|------|------|
| `new` | `new() -> Self` | 创建同步器,初始未同步 |
| `update_from_master` | `update_from_master(&self, master_id: &str, master_ts_ms: u64)` | 由发现模块内部调用,应用一般不直接使用 |
| `now_corrected_ms` | `now_corrected_ms(&self) -> u64` | 返回经偏移修正的当前时间(毫秒) |
| `is_synced` | `is_synced(&self) -> bool` | 是否已与 master 同步 |

**用法**:将 `Arc::new(TimeSynchronizer::new())` 传入 `start_discovery`,之后用 `now_corrected_ms()` 获取协调后的时间戳。

---

### 6. 错误类型

```rust
use rs_ctrl_os::{Result, RsCtrlError};
```

| 变体 | 说明 |
|------|------|
| `Config(String)` | 配置加载/解析错误 |
| `Comms(String)` | 通信错误(如 topic 未找到) |
| `Serialization(String)` | 序列化错误 |
| `Discovery(String)` | 发现模块错误 |
| `NodeNotFound(String)` | 注册表中无此 node_id |
| `Io(std::io::Error)` | IO 错误 |
| `Zmq(zmq::Error)` | ZeroMQ 错误 |
| `Bincode(Box<bincode::ErrorKind>)` | Bincode 序列化/反序列化错误 |

所有 API 返回 `rs_ctrl_os::Result<T>`,可用 `?` 传播。

---

## 安装

在你的 `Cargo.toml` 中添加依赖:

```toml
[dependencies]
rs_ctrl_os = "0.4.2"
```
或者也可以

```bash
cargo add rs_ctrl_os
```

你也可以通过路径依赖 / git 依赖方式在本地使用:

```toml
[dependencies]
rs_ctrl_os = { path = "./rs_ctrl_os" }
```

---

## 快速上手

下面是一个简单的单进程 pub/sub 示例,展示主要 API 的使用方式。

### 1. 初始化日志

```rust
use rs_ctrl_os::init_logging;

fn main() {
    init_logging();
    // ...
}
```

### 2. 从 TOML 加载配置(静态 + 动态)

```toml
# example_config.toml
[static_config]
my_id = "node1"
host = "127.0.0.1"
port = 5555
is_master = true
publish_hz = 1000
subscribe_hz = 1000
dynamic_load_enable = true

[static_config.subscribers]
local_sub = "node1"

[static_config.publishers]
control = "self"

[dynamic]
message_prefix = "hello"
```

**方式一:需要热重载时,用 ConfigManager**

```rust
use std::path::Path;
use serde::Deserialize;
use rs_ctrl_os::ConfigManager;

#[derive(Clone, Deserialize)]
struct DynamicCfg {
    message_prefix: String,
    interval_ms: u64,
}

fn main() -> rs_ctrl_os::Result<()> {
    rs_ctrl_os::init_logging();

    let manager: ConfigManager<DynamicCfg> =
        ConfigManager::new(Path::new("example_config.toml"))?;
    let static_cfg = manager.static_cfg().clone();

    // 通过 manager.get_dynamic_clone() 获取最新 dynamic(文件变化时自动更新)
    Ok(())
}
```

**方式二:不需要热重载时,用 load_config_typed**

```rust
use serde::Deserialize;
use rs_ctrl_os::load_config_typed;

#[derive(Clone, Deserialize)]
struct DynamicCfg {
    message_prefix: String,
}

fn main() -> rs_ctrl_os::Result<()> {
    rs_ctrl_os::init_logging();

    let (static_cfg, dynamic) = load_config_typed::<DynamicCfg>("example_config.toml")?;
    // 一次性加载,无 watcher 开销
    Ok(())
}
```

### 3. 启动节点发现 + 时间同步

```rust
use std::sync::Arc;
use rs_ctrl_os::{start_discovery, TimeSynchronizer};

fn main() -> rs_ctrl_os::Result<()> {
    init_logging();
    // ... 加载配置

    let time_sync = Arc::new(TimeSynchronizer::new());

    let registry = start_discovery(
        &static_cfg.my_id,
        &static_cfg.host,
        static_cfg.port,
        static_cfg.is_master,
        Some(time_sync.clone()),
    )?;

    // registry 会在后台线程持续更新
    Ok(())
}
```

### 4. 创建 Pub/Sub 管理器并发送消息

以下片段需与步骤 1(**方式一** ConfigManager)、步骤 3 组合使用,才能获得 `manager`、`static_cfg`、`registry`、`time_sync`。

```rust
use rs_ctrl_os::PubSubManager;
use std::thread;
use std::time::Duration;

fn main() -> rs_ctrl_os::Result<()> {
    init_logging();
    // ... 步骤 1(ConfigManager)+ 步骤 3(start_discovery),得到 manager, static_cfg, registry, time_sync

    let mut bus = PubSubManager::new(&static_cfg, registry)?;

    loop {
        let dyn_cfg = manager.get_dynamic_clone();
        let ts_ms = time_sync.now_corrected_ms();

        let payload = format!(
            "{} from {} at {} ms",
            dyn_cfg.message_prefix, static_cfg.my_id, ts_ms
        );

        // topic_key = "control",sub_topic = "demo"(bincode 序列化)
        bus.publish_topic("control", "demo", &payload)?;

        // 图像/点云等二进制可用 publish_raw 透传,无需 bincode
        // bus.publish_raw("camera", "frame", &jpeg_bytes)?;

        if let Some(received) = bus.try_recv_specific::<String>("local_sub", "demo")? {
            println!("Received: {received}");
        }

        // 简单示例:按 subscribe_hz 驱动主循环节奏
        let interval = if static_cfg.subscribe_hz > 0 {
            Duration::from_secs_f64(1.0 / static_cfg.subscribe_hz as f64)
        } else {
            Duration::from_millis(100)
        };
        thread::sleep(interval);
    }
}
```

---

## 示例(examples)

### 内置示例

- `examples/pub_node.rs` / `examples/sub_node.rs`:单 pub + 单 sub,pub 使用 `ConfigManager` 热重载,sub 使用 `load_config_typed` 一次性加载。
- `examples/multi_pub_node.rs` / `examples/multi_sub_node.rs`:多子话题 pub/sub,`multi_sub_node` 使用 `set_sub_topics` 过滤子话题;两者均通过 `try_recv_raw` 接收原始 payload 并自行反序列化。

### 实际项目示例

**[can_bridge](https://github.com/LycanW/can_bridge)**:CAN 总线网关,将 Linux SocketCAN 与 ZeroMQ 打通,实现 CAN ↔ 分布式消息的双向桥接。基于 rs_ctrl_os 构建,典型用法包括:

- 使用 `ConfigManager` 加载配置并热重载 `[dynamic]`(接口、设备、控制开关等)
- `start_discovery` + `PubSubManager` 实现传感器数据发布(`sensor_mit` / `sensor_dji` / `sensor_imu`)与控制指令订阅(`ctrl_mit` / `ctrl_dji`- `publish_topic` 发布解析后的传感器 JSON,`try_recv_raw` 接收控制命令
- `[static_config]` 完全遵循 rs_ctrl_os 规范

可作为将 rs_ctrl_os 应用于机器人/嵌入式桥接场景的参考。

运行示例(在项目根目录):

```bash
# 简单 pub/sub
cargo run --example pub_node -- example_config.toml

# 多 pub / 多 sub
cargo run --example multi_pub_node -- multi_pub_config.toml
cargo run --example multi_sub_node -- multi_sub_config.toml
```

---

## 错误处理

库统一使用:

```rust
use rs_ctrl_os::{Result, RsCtrlError};
```

`RsCtrlError` 覆盖了:

- 配置错误:`Config(String)`
- 通信错误:`Comms(String)`
- 序列化错误:`Serialization(String)`
- 发现错误:`Discovery(String)`
- 节点未找到:`NodeNotFound(String)`
- IO 错误:`Io(std::io::Error)`
- ZeroMQ 错误:`Zmq(zmq::Error)`
- Bincode 序列化错误:`Bincode(Box<bincode::ErrorKind>)`

绝大多数 API 都返回 `rs_ctrl_os::Result<T>`,便于在上层直接用 `?` 传播。

---

## 许可证

本项目采用 **MIT** 许可证发布。  
详见 `LICENSE` 文件。