iflow-cli-sdk-rust 0.1.0

Rust SDK for iFlow CLI using Agent Communication Protocol
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
# iFlow Rust SDK 性能优化方案

## 1. 当前性能分析

通过分析现有代码,发现以下性能问题:

1. **频繁的锁竞争**`connected`状态使用Mutex保护,在高并发场景下可能成为瓶颈
2. **消息传递效率**:使用`mpsc::unbounded_channel`可能导致内存无限增长
3. **连接重试机制**:WebSocket连接重试没有指数退避,可能导致服务器压力
4. **资源释放不及时**:部分资源在Drop时才释放,可能影响性能

## 2. 优化目标

1. 减少锁竞争,提高并发性能
2. 优化消息传递机制,防止内存泄漏
3. 改进重试机制,减轻服务器压力
4. 及时释放资源,提高资源利用率

## 3. 优化方案

### 3.1 减少锁竞争

#### 当前问题
在`IFlowClient`中,`connected`状态使用Mutex保护,每次检查都需要获取锁:

```rust
if *self.connected.lock().await {
    // ...
}
```

#### 优化方案
使用原子类型`AtomicBool`替代Mutex:

```rust
use std::sync::atomic::{AtomicBool, Ordering};

pub struct IFlowClient {
    options: IFlowOptions,
    message_receiver: Arc<Mutex<mpsc::UnboundedReceiver<Message>>>,
    message_sender: mpsc::UnboundedSender<Message>,
    connected: Arc<AtomicBool>, // 使用AtomicBool替代Mutex<bool>
    connection: Option<Connection>,
    logger: Option<MessageLogger>,
}

impl IFlowClient {
    pub fn new(options: Option<IFlowOptions>) -> Self {
        let options = options.unwrap_or_default();
        let (sender, receiver) = mpsc::unbounded_channel();
        
        // Initialize logger if enabled
        let logger = if options.log_config.enabled {
            MessageLogger::new(options.log_config.clone()).ok()
        } else {
            None
        };

        Self {
            options,
            message_receiver: Arc::new(Mutex::new(receiver)),
            message_sender: sender,
            connected: Arc::new(AtomicBool::new(false)), // 初始化为false
            connection: None,
            logger,
        }
    }
    
    pub async fn connect(&mut self) -> Result<()> {
        // 使用load方法检查状态,避免锁竞争
        if self.connected.load(Ordering::Relaxed) {
            tracing::warn!("Already connected to iFlow");
            return Ok(());
        }

        // 连接逻辑...
        
        // 连接成功后设置状态
        self.connected.store(true, Ordering::Relaxed);
        Ok(())
    }
    
    pub async fn disconnect(&mut self) -> Result<()> {
        // 设置状态为未连接
        self.connected.store(false, Ordering::Relaxed);
        
        // 断开连接逻辑...
        Ok(())
    }
}
```

### 3.2 优化消息传递机制

#### 当前问题
使用`mpsc::unbounded_channel`可能导致内存无限增长,特别是在消息处理较慢时。

#### 优化方案
使用有界通道,并提供配置选项:

```rust
// 在IFlowOptions中添加消息队列大小配置
impl IFlowOptions {
    /// Set the message queue size for the client
    ///
    /// # Arguments
    /// * `size` - The maximum number of messages to queue (0 for unbounded)
    pub fn with_message_queue_size(mut self, size: usize) -> Self {
        self.message_queue_size = size;
        self
    }
}

// 在types.rs中添加配置项
#[derive(Debug, Clone)]
pub struct IFlowOptions {
    // ... 其他字段
    
    /// Message queue size (0 for unbounded)
    pub message_queue_size: usize,
}

impl Default for IFlowOptions {
    fn default() -> Self {
        Self {
            // ... 其他默认值
            message_queue_size: 100, // 默认100个消息的队列大小
        }
    }
}

// 在client.rs中修改通道创建方式
impl IFlowClient {
    pub fn new(options: Option<IFlowOptions>) -> Self {
        let options = options.unwrap_or_default();
        
        // 根据配置创建有界或无界通道
        let (sender, receiver) = if options.message_queue_size > 0 {
            let (sender, receiver) = mpsc::channel(options.message_queue_size);
            (sender, receiver)
        } else {
            let (sender, receiver) = mpsc::unbounded_channel();
            // 需要适配接收端的类型
            // 这里简化处理,实际需要更复杂的类型适配
            unimplemented!("需要实现有界和无界通道的适配")
        };
        
        // Initialize logger if enabled
        let logger = if options.log_config.enabled {
            MessageLogger::new(options.log_config.clone()).ok()
        } else {
            None
        };

        Self {
            options,
            message_receiver: Arc::new(Mutex::new(receiver)),
            message_sender: sender,
            connected: Arc::new(AtomicBool::new(false)),
            connection: None,
            logger,
        }
    }
}
```

为了简化实现,我们可以创建一个统一的消息通道接口:

```rust
// 在client.rs中定义消息通道trait
trait MessageChannel {
    fn send(&self, message: Message) -> Result<(), mpsc::error::SendError<Message>>;
    fn is_closed(&self) -> bool;
}

// 无界通道实现
struct UnboundedMessageChannel {
    sender: mpsc::UnboundedSender<Message>,
}

impl MessageChannel for UnboundedMessageChannel {
    fn send(&self, message: Message) -> Result<(), mpsc::error::SendError<Message>> {
        self.sender.send(message)
    }
    
    fn is_closed(&self) -> bool {
        // 无界通道不会因为队列满而关闭
        false
    }
}

// 有界通道实现
struct BoundedMessageChannel {
    sender: mpsc::Sender<Message>,
}

impl MessageChannel for BoundedMessageChannel {
    fn send(&self, message: Message) -> Result<(), mpsc::error::SendError<Message>> {
        // 在异步上下文中发送消息
        // 这需要修改API为异步
        unimplemented!("需要异步上下文支持")
    }
    
    fn is_closed(&self) -> bool {
        self.sender.is_closed()
    }
}

// 修改IFlowClient使用trait对象
pub struct IFlowClient {
    options: IFlowOptions,
    message_receiver: Arc<Mutex<mpsc::UnboundedReceiver<Message>>>,
    message_sender: Box<dyn MessageChannel + Send + Sync>,
    connected: Arc<AtomicBool>,
    connection: Option<Connection>,
    logger: Option<MessageLogger>,
}
```

为了保持API的简单性,我们采用一个更实际的方案,使用tokio的select!宏来处理背压:

```rust
// 在client.rs中添加消息发送的超时处理
impl IFlowClient {
    /// Send a message with timeout to prevent blocking
    async fn send_message_with_timeout(&self, message: Message, timeout_secs: f64) -> Result<()> {
        use tokio::time::{timeout, Duration};
        
        timeout(Duration::from_secs_f64(timeout_secs), async {
            self.message_sender.send(message)
                .map_err(|_| IFlowError::connection(
                    IFlowErrorCode::ConnectionClosed,
                    "Message channel closed".to_string()
                ))
        })
        .await
        .map_err(|_| IFlowError::timeout(
            IFlowErrorCode::RequestTimeout,
            "Message send timeout".to_string()
        ))?
    }
}
```

### 3.3 改进WebSocket连接重试机制

#### 当前问题
WebSocket连接重试使用线性延迟,没有考虑服务器压力:

```rust
// 当前实现
let delay = Duration::from_millis(1000 * connect_attempts as u64);
```

#### 优化方案
使用指数退避算法,并添加随机抖动:

```rust
/// Calculate exponential backoff with jitter
///
/// # Arguments
/// * `attempt` - Attempt number (0-based)
/// * `base_delay` - Base delay in milliseconds
/// * `max_delay` - Maximum delay in milliseconds
///
/// # Returns
/// Delay duration with jitter
fn calculate_backoff(attempt: u32, base_delay: u64, max_delay: u64) -> Duration {
    use rand::Rng;
    
    // Calculate exponential backoff: base_delay * 2^attempt
    let exponential_delay = base_delay * 2u64.pow(attempt);
    
    // Cap at max_delay
    let capped_delay = exponential_delay.min(max_delay);
    
    // Add random jitter: 0.5 to 1.5 times the capped delay
    let mut rng = rand::thread_rng();
    let jitter = rng.gen_range(0.5..=1.5);
    let jittered_delay = (capped_delay as f64 * jitter) as u64;
    
    Duration::from_millis(jittered_delay)
}

// 在connect_websocket方法中使用
async fn connect_websocket(&mut self) -> Result<()> {
    // ... 其他代码
    
    // Connect to WebSocket with exponential backoff
    let mut connect_attempts = 0;
    let max_connect_attempts = 5;
    let base_delay_ms = 1000; // 1 second
    let max_delay_ms = 30000; // 30 seconds
    
    while connect_attempts < max_connect_attempts {
        match transport.connect().await {
            Ok(_) => {
                info!("Successfully connected to WebSocket at {}", final_url);
                break;
            }
            Err(e) => {
                connect_attempts += 1;
                tracing::warn!("Failed to connect to WebSocket (attempt {}): {}", connect_attempts, e);
                
                if connect_attempts >= max_connect_attempts {
                    return Err(IFlowError::connection(
                        IFlowErrorCode::ConnectionFailed,
                        format!("Failed to connect to WebSocket after {} attempts: {}", 
                               max_connect_attempts, e)
                    ));
                }
                
                // Calculate backoff with jitter
                let delay = calculate_backoff(connect_attempts - 1, base_delay_ms, max_delay_ms);
                tracing::info!("Waiting {:?} before retry...", delay);
                tokio::time::sleep(delay).await;
            }
        }
    }
    
    // ... 其他代码
}
```

### 3.4 及时释放资源

#### 当前问题
部分资源在Drop时才释放,可能影响性能。

#### 优化方案
在适当的时候主动释放资源:

```rust
impl IFlowClient {
    /// Disconnect from iFlow and release resources immediately
    pub async fn disconnect(&mut self) -> Result<()> {
        // 立即设置为未连接状态
        self.connected.store(false, Ordering::Relaxed);

        // 主动释放连接资源
        if let Some(connection) = self.connection.take() {
            match connection {
                Connection::Stdio { acp_client, mut process_manager, session_id: _, initialized: _ } => {
                    // 主动关闭ACP客户端连接
                    drop(acp_client);
                    
                    // 停止进程
                    if let Some(mut pm) = process_manager.take() {
                        // 异步调用stop,但不等待完成
                        tokio::spawn(async move {
                            if let Err(e) = pm.stop().await {
                                tracing::warn!("Error stopping process manager: {}", e);
                            }
                        });
                    }
                }
                Connection::WebSocket { mut acp_protocol, mut process_manager, session_id: _ } => {
                    // 主动关闭协议连接
                    let _ = acp_protocol.close().await;
                    
                    // 停止进程
                    if let Some(mut pm) = process_manager.take() {
                        tokio::spawn(async move {
                            if let Err(e) = pm.stop().await {
                                tracing::warn!("Error stopping process manager: {}", e);
                            }
                        });
                    }
                }
            }
        }

        info!("Disconnected from iFlow");
        Ok(())
    }
}

// 在IFlowProcessManager中也改进资源释放
impl Drop for IFlowProcessManager {
    fn drop(&mut self) {
        // 在drop时也主动停止进程
        if let Some(mut process) = self.process.take() {
            // 使用spawn_blocking在阻塞线程中执行
            tokio::task::spawn_blocking(move || {
                // 在阻塞上下文中尝试终止进程
                let _ = process.start_kill();
            });
        }
    }
}
```

## 4. 性能测试方案

### 4.1 基准测试

使用Criterion.rs进行性能基准测试:

```rust
// benches/client_benchmark.rs
use criterion::{criterion_group, criterion_main, Criterion};
use iflow_cli_sdk_rust::{IFlowClient, IFlowOptions};
use tokio::runtime::Runtime;

fn benchmark_client_creation(c: &mut Criterion) {
    let mut group = c.benchmark_group("client");
    
    group.bench_function("create_client", |b| {
        b.iter(|| {
            let _client = IFlowClient::new(None);
        })
    });
    
    group.finish();
}

fn benchmark_connect_disconnect(c: &mut Criterion) {
    let mut group = c.benchmark_group("connection");
    
    group.bench_function("connect_disconnect", |b| {
        b.to_async(Runtime::new().unwrap()).iter(|| async {
            let mut client = IFlowClient::new(None);
            // 这里需要一个mock的iFlow服务来进行测试
            // let _ = client.connect().await;
            // let _ = client.disconnect().await;
        })
    });
    
    group.finish();
}

criterion_group!(benches, benchmark_client_creation, benchmark_connect_disconnect);
criterion_main!(benches);
```

### 4.2 负载测试

创建并发测试来验证优化效果:

```rust
// tests/concurrent_test.rs
use iflow_cli_sdk_rust::{IFlowClient, IFlowOptions};
use tokio::time::{timeout, Duration};

#[tokio::test]
async fn test_concurrent_connections() {
    const NUM_CLIENTS: usize = 10;
    
    let mut handles = vec![];
    
    for _ in 0..NUM_CLIENTS {
        let handle = tokio::spawn(async {
            let mut client = IFlowClient::new(None);
            // 使用timeout防止测试hang住
            let result = timeout(Duration::from_secs(10), client.connect()).await;
            match result {
                Ok(Ok(_)) => {
                    let _ = client.disconnect().await;
                    true
                }
                _ => false
            }
        });
        handles.push(handle);
    }
    
    let mut success_count = 0;
    for handle in handles {
        if let Ok(true) = handle.await {
            success_count += 1;
        }
    }
    
    // 验证大部分连接都成功了
    assert!(success_count > NUM_CLIENTS * 8 / 10); // 至少80%成功
}
```

## 5. 向后兼容性

1. 保持所有公共API的签名不变
2. 为新的配置选项提供默认值
3. 在主要版本更新前提供迁移指南

## 6. 实施计划

1. 第一阶段:实现原子类型替代Mutex
2. 第二阶段:改进消息传递机制
3. 第三阶段:优化连接重试机制
4. 第四阶段:及时释放资源
5. 第五阶段:添加性能测试
6. 第六阶段:文档更新和示例代码