lwleen-rpc 1.3.3

RPC (信令路由), 组件间数据通信
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
//! 🛸`信令路由`
//! 
//! - 🛸 可扩展    
//! `demo代码` [https://docs.rs/crate/lwleen-rpc/latest/source/src/mod_demo.rs](https://docs.rs/crate/lwleen-rpc/latest/source/src/mod_demo.rs)    
//! 
//! - 🌈 自定义服务端
//! ```no_run
//!    use async_std;
//!    let 信令路由c1 = 信令路由::new(rpc类型::none自定义);    // 用于自定义服务端
//!    let 信令路由c2 = 信令路由c1.clone();
//!    std::thread::spawn(move||{
//!        async_std::task::block_on(async move{
//!            while let Ok(msg) = 信令路由c2.core信令包_接收通道().recv().await{
//!                async_std::task::spawn( 信令路由::core信令包_消费执行(&信令路由c2, msg) );
//!            }
//!        });
//!    });
//! ```
//! 
//! > 已实现方法
//! - block_on_lite `可嵌套`
//! - spawn异步任务
//! - data_manage
//! - rpc信令路由_状态
//! - 定时器
//! 

use std::{
    pin::Pin,
    task::{Context, Poll},
    panic::AssertUnwindSafe,
};
use pin_project_lite::pin_project;
use futures_lite::future::block_on;
use crate::prelude::*;
use crate::mod_timer::*;
use crate::mod_tool::*;

// use std::panic::AssertUnwindSafe;                                //catch_unwind, 
use futures::{
    StreamExt,  FutureExt,  // FutureExt, FusedStream, Stream
    future::{ Future, BoxFuture, FusedFuture },
};



pub type 信令ID = u64;
// type 可存储_信令异步执行函数 = Box<dyn Fn(信令路由, 信令ID, Box信令特征) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static>  ;
// type 异步Future = Pin<Box<dyn Future<Output = anyhow_Result<Box信令特征>> + Sync + Send + 'static>>;
pub type Box信令特征 = Box<dyn 信令特征>;
// pub type 信令异步执行函数 = Pin<Box< dyn Future<Output = anyhow_Result<()>> + Send + 'static >>;   //允许跨线程传递(Send)  跨线程共享(Sync)    
pub type 信令异步执行函数 = BoxFuture<'static, anyhow_Result<Box<dyn 信令特征>>>;
pub enum 信令发送包 {
    信令包(信令ID, Box信令特征), // 通知类型 没有参数  没有返回
    Fut异步任务包(BoxFuture<'static,()>),
}
enum 信令等待包 {
    // 异步通道(async_Sender<anyhow_Result<Box信令特征>>),          
    异步通道oneshot(oneshot::Sender<anyhow_Result<Box信令特征>>),                           //有结果用通道发送结果
    回调通道(Box<dyn Send + FnOnce(anyhow_Result<Box信令特征>)>),                    //async 有结果调用回调
}




/// ## 信令特征
/// ```no_run
/// - Sync   即  Mutex  多线程共享访问
/// - Send   即  move   转移所有权
/// - Arc    即  Clone  共享数据
/// ```
pub trait 信令特征: Send + Any + 'static{
    // fn into_BoxAny(self:Box<Self>) -> Box<dyn Any>;
    // fn as_Any(&mut self) -> &mut dyn std::any::Any;
    // fn 信令名(&self)->String;
    // fn clone_Box(&self) -> Box信令特征;
    // fn into_Box(self) -> Box信令特征;
    fn 动态执行函数Box(self:Box<Self>,信令路由:信令路由, id:信令ID)-> 信令异步执行函数;
    fn 动态执行函数(self, 信令路由:信令路由,id:信令ID)->信令异步执行函数;
}





/// ## 信令任务
/// - 🍁 异步 `.await`  继承异步特征
/// - 🍁 同步 `.block阻塞完成()`  本地运行信令的`执行函数`
/// ```md
/// - 等价于 pub trait 信令任务<T>  where Self: Future<Output=T>
/// - `IntoFuture` 版本 pub trait 信令任务<T> : IntoFuture<Output=T>   where Self::IntoFuture: Send
/// ```
pub trait 信令任务<T> : Future<Output=anyhow::Result<T>>{   
    /// ### 🍁 阻塞等待完成
    fn block阻塞完成(self)->anyhow::Result<T>;
}
impl<F,Fut, T> 信令任务<T> for 信令任务Builder<F,Fut>
where F: FnOnce(bool)-> Fut,
      Fut: Future<Output = anyhow::Result<T>>,
{
    fn block阻塞完成(mut self)-> anyhow::Result<T>{
        match self.外部调度函数.take(){
            Some(调度函数) =>{
                match block_on(AssertUnwindSafe(调度函数(true)).catch_unwind()){
                    Ok(res)=>{  res  }
                    Err(err)=>{ Err(anyhow::anyhow!("🍎🍎🍎rust【信令任务Builder > block阻塞完成 错误】  => {err:?}"))  }
                }
            }
            None=>{ Err(anyhow::anyhow!("🍎🍎🍎rust[信令任务Builder > block阻塞完成 错误]: 外部调度函数 空")) }
        }
    }
}




pin_project!{
    pub struct 信令任务Builder<F,Fut>{
        外部调度函数: Option<F>,
        #[pin]
        inner:Option<Fut>,                    //外来的异步函数
    }
}
impl<F,Fut> 信令任务Builder<F,Fut>
    where F: FnOnce(bool)-> Fut,
          Fut: Future,
{
    pub fn new(f:F)->Self{   Self { 外部调度函数:Some(f), inner:None }     }
}
impl<F,Fut,T> Future for 信令任务Builder<F,Fut>
    where F: FnOnce(bool)-> Fut,
          Fut: Future<Output = anyhow::Result<T>>,
{
    type Output = anyhow::Result<T>;
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output>  {
        let mut this = self.project();
        if let Some(调度函数) = this.外部调度函数.take(){   this.inner.set(Some(调度函数(false)));   }
        
        match this.inner.as_mut().as_pin_mut(){
            None =>{  Poll::Ready(Err(anyhow::anyhow!("🍎🍎🍎rust[信令任务Builder > 异步await poll 错误]: inner为空!!!")))  }
            Some(f) =>{
                match f.poll(ctx){
                    Poll::Pending => { Poll::Pending }
                    Poll::Ready(value) => { this.inner.set(None); Poll::Ready(value) }
                }
            }
        }
    }
}
impl<F,Fut,T> FusedFuture for 信令任务Builder<F,Fut>
    where F: FnOnce(bool)-> Fut,
          Fut: Future<Output = anyhow::Result<T>>,
{
    fn is_terminated(&self) -> bool {  self.外部调度函数.is_none() && self.inner.is_none()   }
}



// use crate::{ 全局_信令路由 };
// /// 将 Future 发送到异步环境运行
// pub trait Fut信令路由执行特征 <R: Send + 'static> {  
//     fn pool异步任务_block阻塞完成(self) -> R; 
//     fn spawn信令路由任务(self); 
// }
// impl<T, R: Send + 'static> Fut信令路由执行特征<R> for T where T: Future<Output = R> + Send + 'static {
//     /// - pool线程池内运行
//     fn pool异步任务_block阻塞完成(self) -> R{
//         全局_信令路由.get().expect("全局_信令路由  不存在").pool异步任务_block阻塞完成(self).expect("全局_信令路由  执行错误")
//     }
//     /// - 仅发送任务到服务端,不等待,不阻塞
//     fn spawn信令路由任务(self){
//         全局_信令路由.get().expect("全局_信令路由  不存在").spawn异步任务(async move{   let _ = self.await;  })
//     }
// }
// pub static 全局_信令路由: OnceCell<信令路由> = OnceCell::new();               //自定义延迟初始化




pub struct rpc内部{
    信令id: AtomicU64,
    通道组: (async_Sender<信令发送包>, async_Receiver<信令发送包>),    //多发送者  多接收
    存取数据管理: StateManager,
    等待的任务表: Mutex<HashMap<信令ID, 信令等待包>>,   //多线程访问

    pub 路由uuid: String,
    pub 服务端类型:rpc类型,
    // pub 服务端类型: RwLock<rpc类型>,
    pub log记录:   RwLock<Vec<String>>,
    pub 数据流通道表:DashMap<信令ID, Vec< async_Sender<Box信令特征> >  >, 
    pub 任务流通道表:DashMap<String, Vec< async_Sender<Box信令特征> >  >,
    
    pub pool线程池:ThreadPool,
    #[cfg(all(feature = "mod_gtk", any(target_os = "macos", target_os = "linux", target_os = "windows")))]
    pub gtk环境loop:glib::MainLoop,
}
impl rpc内部{
    #[inline(always)]
    fn inner生成id(&self)-> 信令ID{ self.信令id.fetch_add(1, Ordering::Relaxed)    }
    #[cfg(all(feature = "mod_gtk", any(target_os = "macos", target_os = "linux", target_os = "windows")))]
    pub fn inner_gtk_MainContext(&self)-> glib::MainContext{  self.gtk环境loop.context() }
    
    pub fn inner路由uuid(&self)-> &String{  &self.路由uuid  }
    pub fn inner添加记录(&self,描述:impl Into<String>){  self.log记录.write().push(描述.into());   }
    pub fn inner数据流通道表(&self)-> &DashMap<信令ID, Vec<async_Sender<Box信令特征>>>{  &self.数据流通道表   }
    pub fn inner任务流通道表(&self)-> &DashMap<String, Vec<async_Sender<Box信令特征>>>{  &self.任务流通道表  }
    pub fn inner服务端类型(&self)-> &rpc类型{  &self.服务端类型  }
    pub fn inner判断无服务端(&self)->bool{  self.服务端类型 == rpc类型::default无服务端 }
    pub fn rpc信令路由_状态(&self)->rpc路由状态{
        rpc路由状态{
            路由uuid:  self.路由uuid.clone(),
            服务端类型: self.inner服务端类型().to_string(),
            版本:     env!("CARGO_PKG_VERSION").to_string(),
            作者:     env!("CARGO_PKG_AUTHORS").to_string(),
            主页:     env!("CARGO_PKG_HOMEPAGE").to_string(),
            // 打包时间: env!("BUILD_TIME").to_string(),
            文件所在: std::env::current_exe().unwrap_or_default().display().to_string(),
            CPU数量: *全局_CPU数量,
            信令计数: self.信令id.load(Ordering::Relaxed),
            等待的信令计数:self.等待的任务表.lock().len(),
            // 等待的信令数组:self.等待的任务表.lock().iter().map(|(id,等待包)|{  format!("{id}") }).collect::<Vec<_>>() ,
            rusty_pool状态:format!("{}{} 当前存活线程 / {} 空闲线程",self.pool线程池.get_name(),self.pool线程池.get_current_worker_count(), self.pool线程池.get_idle_worker_count()),
            启动时间: TIME时间::time_毫秒转日期(*全局_启动时间).unwrap_or_default(),
            时间戳:   TIME时间::now_毫秒(),
            log记录:  self.log记录.read().clone(),
        }
    }
}
impl std::ops::Deref for 信令路由 {    //可以访问 内部函数
    type Target = rpc内部;
    fn deref(&self) -> &Self::Target {
        &self.arc数据
    }
}


/// ## 🛸 信令路由 (rpc)
/// - `default()` 新建  即 `rpc类型::default无服务端`
/// - `new()` 新建
/// - `core信令包_消费执行()`   自定义服务端
/// - `core信令包_接收通道()`   自定义服务端
/// - `inner数据流通道表()` 通用的信令通道,u64为标识符
/// - `inner任务流通道表()` 通用的信令通道,u64为标识符
/// - `inner_gtk_MainContext()`  gtk运行环境 glib::MainContext
/// - `inner服务端类型()` 获取服务端类型
/// - `inner添加记录()` 用于记录信息
/// ### 💓 核心
/// - `task唤醒_等待的事件()` 手动唤醒等待的事件
/// - `task发布信令_单向()`  不需要回复的信令
/// - `task发布信令_回调()`  带有回调函数的信令
/// - `task发布信令_同步()`  
/// - `task发布信令_异步()`  异步执行信令
/// - `task发布_信令任务()`  信令任务  .await  .block阻塞完成()
/// ### 💝 核心宏
/// - `impl信令特征!()`     为信令表 实现`信令特征`
/// - `core发布信令任务!()` 快速 创建一个信令任务
/// ### 任务相关
/// - `block_on_lite()`  futures_lite 异步执行器
/// - `spawn异步任务()`   发送一个异步函数 到服务端执行
/// - `pool()`     引用rusty_pool
/// - `pool同步任务()` pool中新建执行同步函数
/// - `pool异步任务()` pool中新建执行异步函数
/// - `pool异步任务_block阻塞完成()`  发送一个异步函数到 pool线程池执行(阻塞完成)
/// - `pool同步任务_block阻塞完成()`  发送一个同步函数到 pool线程池执行(阻塞完成)
/// ### 🧊 数据管理
/// - `data_manage()`  管理数据
/// - `data_unmanage()` 移除管理数据
/// - `data_get()`     获取数据
/// - `data_try_get()` 获取数据
/// ### 时间
/// - `time延时任务()`  延时任务
/// - `Timeout()`      执行任务一次
/// - `Interval()`     循环执行任务
/// ### 🍰状态
/// - `rpc信令路由_状态()` 获取工作状态
/// - `on阻塞_主线程()`   阻塞主线程
/// - `emit退出主线程()`  退出主线程
#[derive(Clone)]
pub struct 信令路由{  pub arc数据:Arc<rpc内部>  }
impl Default for 信令路由{
    /// ## default
    /// > `default` 即 `rpc类型::default无服务端`
    fn default() -> Self {  信令路由::new(rpc类型::default无服务端)  }
}






/// ## rpc类型 (服务端类型)
#[derive(Clone,PartialEq,EnumDisplay)]
pub enum rpc类型{
    /// - default无服务端, 直接调用异步函数, - 🍁spawn异步任务🍁 会使用rusty_pool线程池
    default无服务端,
    /// - none自定义  🍁用于自定义环境
    none自定义,
    /// - 🍁🍁🍁单线程注意阻塞, 避免卡死(如下), 单线程中应该使用 `异步通道` `异步锁`
    /// ```no_run
    /// let (tx,rx) = async_bounded(1);
    /// 信令路由c1.spawn异步任务(async move{  tx.send(1).ok();  });  //派发的任务
    /// let _ = rx.recv();               //阻塞单线程, 导致`派发的任务`永远无法执行
    /// ```
    futures单线程,
    rusty_pool线程池,
    /// ### 🍁某些UI界面框架 和 这个冲突, 不能同时使用
    #[cfg(all(feature = "mod_gtk", any(target_os = "macos", target_os = "linux", target_os = "windows")))]
    gtk异步环境,
}

impl 信令路由{
    /// ## 🌈可自定义服务端
    /// ```no_run
    ///use async_std;
    ///let 信令路由c1 = 信令路由::new(rpc类型::none自定义);    // 用于自定义服务端
    ///let 信令路由c2 = 信令路由c1.clone();
    ///std::thread::spawn(move||{
    ///    async_std::task::block_on(async move{
    ///        while let Ok(msg) = 信令路由c2.core信令包_接收通道().recv().await{
    ///            async_std::task::spawn( 信令路由::core信令包_消费执行(&信令路由c2, msg) );
    ///        }
    ///    });
    ///});
    /// ```
    pub fn new(类型: rpc类型) -> Self {
        let mut 信令路由new = Self{
            arc数据: Arc::new(rpc内部{
                    信令id: AtomicU64::new(0),
                    通道组: async_unbounded(),
                    存取数据管理:StateManager::new(),
                    等待的任务表: Mutex::new(HashMap::new()),

                    路由uuid: TOOL工具::uuid随机().to_string(),
                    服务端类型:类型.clone(),
                    // 服务端类型:RwLock::new(类型.clone()),
                    log记录:RwLock::new(Vec::new()),
                    数据流通道表:DashMap::new(),
                    任务流通道表:DashMap::new(),
                    
                    pool线程池: ThreadPool_Builder::new().name("rpc服务端_rusty_pool线程池".into()).build(),
                    #[cfg(all(feature = "mod_gtk", any(target_os = "macos", target_os = "linux", target_os = "windows")))]
                    gtk环境loop: {
                        let main_ctx = glib::MainContext::ref_thread_default();        //获取当前线程的默认主上下文并增加其引用计数
                        let main_loop = glib::MainLoop::new(Some(&main_ctx), false);
                        let main_loop_c1 = main_loop.clone();
                        let _ = std::thread::Builder::new().name("线程_gtk_loop".to_string()).spawn(move ||{  main_loop_c1.run(); }); 
                        main_loop
                    },
            })
        };

        fn inner_生成服务端(信令路由ins:&mut 信令路由,类型:rpc类型){
            match 类型 {
                rpc类型::default无服务端    =>{   信令路由ins.inner添加记录("default无服务端") ;    }
                rpc类型::none自定义         =>{   信令路由ins.inner添加记录("自定义rpc服务端") ;    }
                rpc类型::futures单线程      =>{
                    let 信令路由c1 = 信令路由ins.clone();
                    std::thread::Builder::new().name("rpc服务端_单线程异步环境futures".to_string()).spawn(move || {
                        block_on_lite(async move{
                            信令路由c1.inner添加记录("rpc服务端: 单线程 futures_lite::future::block_on");
                            let mut 并行任务 = futures::stream::FuturesUnordered::new();  
                            loop{
                                select! {
                                    () = 并行任务.select_next_some() => {},
                                    ok_msg = 信令路由c1.core信令包_接收通道().recv().fuse() =>{         //收到msg,到<并行任务>里被执行,需要 3 ~ 5 微秒
                                        if let Ok(msg) = ok_msg{ 并行任务.push(信令路由::core信令包_消费执行(&信令路由c1,msg)); }
                                        else{ break; }      
                                    },
                                    complete => {}, 
                                }
                            }
                        });
                    }).ok();
                }
                rpc类型::rusty_pool线程池   =>{
                    let 信令路由c1 = 信令路由ins.clone();
                    信令路由ins.arc数据.pool线程池.spawn(async move{
                        信令路由c1.inner添加记录("rpc服务端: 多线程 rusty_pool");
                        while let Ok(msg) = 信令路由c1.core信令包_接收通道().recv().await{
                            let 信令路由c2 = 信令路由c1.clone();
                            信令路由c1.arc数据.pool线程池.spawn(async move{
                                信令路由::core信令包_消费执行(&信令路由c2,msg).await;
                            });
                        }
                    });
                }
                #[cfg(all(feature = "mod_gtk", any(target_os = "macos", target_os = "linux", target_os = "windows")))]
                rpc类型::gtk异步环境        =>{
                    // - 服务端_异步环境-------- (不支持 gio 异步函数, gio需要在 gio::glib::MainContext 环境里运行)  ---- #[cfg(debug_assertions)]  此函数只在开发环境中存在
                    // - 用于其他线程派发任务   glib_context.spawn_from_within(move||信令路由::core信令包_消费执行(信令路由c2, msg));
                    let 信令路由c1 = 信令路由ins.clone();
                    信令路由ins.inner_gtk_MainContext().spawn(async move{
                        信令路由c1.inner添加记录("rpc服务端: gtk环境loop");
                        while let Ok(msg) = 信令路由c1.core信令包_接收通道().recv().await{
                            let 信令路由c2 = 信令路由c1.clone();
                            信令路由c1.inner_gtk_MainContext().spawn(async move{
                                信令路由::core信令包_消费执行(&信令路由c2,msg).await;
                            });
                        }
                    });
                }
            }
        }
        inner_生成服务端(&mut 信令路由new, 类型);

        // 全局_信令路由.get_or_init(||信令路由new.clone());
        信令路由new
    }
    /// ## 信令包消费
    ///  - 此函数应该在**服务端**调用, 处理**信令包**
    pub async fn core信令包_消费执行(信令路由ref:&信令路由, 信令包:信令发送包){
        match 信令包{
            信令发送包::信令包(id, 单个信令) => {
                let 结果 = match AssertUnwindSafe(单个信令.动态执行函数Box(信令路由ref.to_owned(), id)).catch_unwind().await{
                    Ok(res)=>{ res }
                    Err(err)=>{ Err(anyhow_err!("🍎🍎🍎rust【信令路由 > core信令包_消费执行 > 信令包 id:{id} => {err:?}")) }
                };
                信令路由ref.task唤醒_等待的事件(id,结果);
            }
            信令发送包::Fut异步任务包(异步任务)=>{
                if let Err(err) = AssertUnwindSafe(异步任务).catch_unwind().await{
                    eprintln!("🍎🍎🍎rust【信令路由 > core信令包_消费执行 > Fut异步任务包】  => {err:?}")
                }
            }
        }
    }
    #[inline(always)]
    pub fn core信令包_接收通道(&self) -> &async_Receiver<信令发送包> {        &self.arc数据.通道组.1    }
    #[inline(always)]
    fn core信令包_发送通道(&self) -> &async_Sender<信令发送包>   {      &self.arc数据.通道组.0    }
    
    pub fn task唤醒_等待的事件(&self, id: 信令ID,  result:  anyhow_Result<Box信令特征> ) {         //----取出----等待的任务,并回复
        self.arc数据.等待的任务表.lock().remove(&id)
            .map(|等待包|{
                match 等待包 {         // 信令等待包::异步通道(tx)        => {  tx.force_send(result).ok();  }
                    信令等待包::异步通道oneshot(tx) => {  tx.send(result).ok();             }
                    信令等待包::回调通道(f)   => {  f(result);                        }
                }
            });
    }
    pub fn task发布信令_单向(&self, 单向信令: impl 信令特征){
        let id = self.inner生成id();
        match self.inner服务端类型() {
            rpc类型::default无服务端=>{ block_on_lite(信令路由::core信令包_消费执行(self, 信令发送包::信令包(id, Box::new(单向信令))));  }  /*在此处 解包执行*/   
            _=>{  let _ = self.core信令包_发送通道().force_send(信令发送包::信令包(id, Box::new(单向信令)));  }  /*在服务端 解包执行*/  
        }
    }
    pub fn task发布信令_回调(&self, 双向信令: impl 信令特征,  f: impl Send + FnOnce(anyhow_Result<Box信令特征>) + 'static ){
        let id = self.inner生成id();
        //任务执行后, `唤醒任务`会查找`等待的任务表`发送结果
        self.arc数据.等待的任务表.lock().insert(id, 信令等待包::回调通道(Box::new(f)));
        match self.inner服务端类型() {
            rpc类型::default无服务端=>{ block_on_lite(信令路由::core信令包_消费执行(self, 信令发送包::信令包(id, Box::new(双向信令))));  }  /*在此处 解包执行*/   
            _=>{  let _ = self.core信令包_发送通道().force_send(信令发送包::信令包(id, Box::new(双向信令)));  }  /*在服务端 解包执行*/  
        }
    }
    pub fn task发布信令_同步(&self,双向信令: impl 信令特征)-> anyhow_Result<Box信令特征>{
        let (tx,rx) = async_bounded(1);
        self.task发布信令_回调(双向信令,move|结果|{ tx.force_send(结果).ok();  });
        rx.recv_blocking()?
    }
    pub async fn task发布信令_异步(&self, 双向信令: impl 信令特征) -> anyhow_Result<Box信令特征>{
        let (id, (tx,rx)) = (self.inner生成id(), oneshot::channel());
        self.arc数据.等待的任务表.lock().insert(id, 信令等待包::异步通道oneshot(tx));
        match self.inner服务端类型() {
            rpc类型::default无服务端=>{  信令路由::core信令包_消费执行(self, 信令发送包::信令包(id, Box::new(双向信令))).await;  }  /*在此处 解包执行*/   
            _=>{  let _ = self.core信令包_发送通道().send(信令发送包::信令包(id, Box::new(双向信令))).await;  }  /*在服务端 解包执行*/  
        }
        rx.await?
    }
    /// ## 宏  core发布信令任务!()
    pub fn task发布_信令任务<T , F>(信令路由obj:信令路由, 双向信令: impl 信令特征, 解析信令函数:F ) -> impl 信令任务<T>
        where F: FnOnce(anyhow_Result<Box信令特征>) -> anyhow_Result<T>  
    {
        信令任务Builder::new(move|本地运行bool| async move{
            解析信令函数(
                if 本地运行bool || 信令路由obj.inner判断无服务端(){
                    双向信令.动态执行函数(信令路由obj, 0).await     /*在此处 解包执行*/
                }else{
                    信令路由obj.task发布信令_异步(双向信令).await     /*在服务端 解包执行*/
                }
            )  
        })
    }
    
    
    /// ### 不阻塞 不等待 仅发送`异步任务`到服务端运行
    /// - 🍁default无服务端🍁 会使用rusty_pool线程池      /// 仅发送任务到服务端,不等待,不阻塞
    pub fn spawn异步任务(&self, 异步任务: impl Future<Output = ()> + Send + 'static){
        match self.arc数据.服务端类型{
            rpc类型::default无服务端=>{  self.arc数据.pool线程池.spawn(异步任务);     }
            _=>{  let _ = self.core信令包_发送通道().force_send(信令发送包::Fut异步任务包(异步任务.boxed()));     }
        }
    }
    /// - ### 🍁 阻塞等待完成  futures_lite::block_on  执行异步任务
    pub fn block_on_lite<R>(异步任务: impl Future<Output = R>)->R{   block_on_lite(异步任务) }
    /// - 引用 rusty_pool
    #[inline(always)]
    pub fn pool(&self)-> &ThreadPool{ &self.arc数据.pool线程池 }
    pub fn pool同步任务<T:Send + 'static>(&self,同步任务:impl FnOnce() -> T + Send + 'static)->oneshot::Receiver<T>{
        self.arc数据.pool线程池.evaluate(同步任务).receiver
    }
    /// ## 任务 await 等待时, 会让出线程
    pub fn pool异步任务<T:Send + 'static>(&self,异步任务:impl Future<Output = T> + 'static + Send)->oneshot::Receiver<T>{
        self.arc数据.pool线程池.spawn_await(异步任务).receiver
    }
    /// ### 🍁 阻塞等待完成  在pool线程池中完成
    pub fn pool异步任务_block阻塞完成<T: Send + 'static>(&self, 异步任务:impl Future<Output = T> + Send + 'static) -> anyhow_Result<T>{
        let rx = self.arc数据.pool线程池
            .try_spawn_await(异步任务).map_err(|err|anyhow_err!("{err}"))?
            .receiver;
        block_on_lite(rx).map_err(|err|anyhow_err!("{err}"))
    }
    /// ### 🍁 阻塞等待完成  在pool线程池中完成
    pub fn pool同步任务_block阻塞完成<T: Send + 'static>(&self, 同步任务:impl FnOnce() -> T + Send + 'static) -> anyhow_Result<T>{
        let rx = self.arc数据.pool线程池
            .try_evaluate(同步任务).map_err(|err|anyhow_err!("{err}"))?
            .receiver;
        block_on_lite(rx).map_err(|err|anyhow_err!("{err}"))
    }


    /// 存取数据管理
    pub fn data_manage<T>(&self, state: T) -> bool  where T: Send + Sync + 'static, { self.arc数据.存取数据管理.set(state) }
    pub fn data_unmanage<T>(&self) -> Option<T>     where T: Send + Sync + 'static, { unsafe { self.arc数据.存取数据管理.unmanage() } }
    pub fn data_get<T>(&self) -> State<'_, T>       where  T: Send + Sync + 'static, { self.data_try_get().unwrap_or_else(|| { panic!("state() called before manage() for {}", std::any::type_name::<T>() )   })   }
    pub fn data_try_get<T>(&self) -> Option<State<'_, T>>  where T: Send + Sync + 'static, { self.arc数据.存取数据管理.try_get() }

    
    pub fn time延时任务(&self,时间ms:u64)-> impl 信令任务<()>{
        core发布信令任务!(self, 信令表::延时器{time_id:0,延时ms:时间ms},  信令表::延时器_回复 )
    }
    pub fn Timeout(&self, 定时ms:u64 , 回调函数:impl Fn(定时器) + Send  + 'static)->定时器{
        定时器::new(self.clone(),定时ms,false, 回调函数)
    }
    pub fn Interval(&self, 定时ms:u64 , 回调函数:impl Fn(定时器) + Send  + 'static )->定时器{
        定时器::new(self.clone(),定时ms,true, 回调函数)
    }
    
    
    pub fn rpc信令路由_心跳(&self,信息: impl Into<String>)-> impl 信令任务<(String,u128)>{
        core发布信令任务!(self, 信令表::路由心跳{ 信息:信息.into() }, 信令表::路由心跳_回复 => 信息,时间戳ns)
    }

    pub fn shutdown(self){
        #[cfg(all(feature = "mod_gtk", any(target_os = "macos", target_os = "linux", target_os = "windows")))]
        self.arc数据.gtk环境loop.quit();
        self.arc数据.pool线程池.clone().shutdown_join();
        drop(self)
    }
}


#[cfg(not(all(target_arch = "wasm32", feature = "wasm-bindgen")))]
impl 信令路由{
    pub fn emit退出主线程(&self,code:i32, 退出原因:Option<String>){
        if let Some(通道数组) = self.inner任务流通道表().get(&self.arc数据.路由uuid){
            if 通道数组.len()>=1{  通道数组[0].force_send(Box::new(信令表::退出进程 { code , 退出原因 })).ok();    }
        }
    }
    pub fn on阻塞_主线程(&self)->anyhow_Result<()>{     //必须在主线程调用
        let (发送tx,接收rx) =  async_unbounded();
        self.inner任务流通道表().entry(self.arc数据.路由uuid.clone())
                        .or_insert(Vec::new())
                        .push(发送tx);
        
        while let Ok(信令) = 接收rx.recv_blocking() {
            match 信令.try_into(){
                Err(err)=>{   eprintln!("on阻塞_主线程 [错误] 来自其它信令表:{err}");  }
                Ok(信令表)=>{
                    match 信令表{
                        信令表::退出进程 { code, 退出原因 }=>{
                            退出原因.map(|原因|{ eprintln!("进程退出:{code} <{原因}>"); });
                            break;
                        }
                        _=>{}
                    }
                }
            }
        }
        Ok(())
    }
}


//常用的方法
impl 信令路由{
    /// 关联函数
    pub fn fn_立即退出当前进程(code: i32){  std::process::exit(code);  }
    pub fn fn_timestamp()->u128{   TIME时间::now_毫秒()       }
    pub fn fn_time_日期()->String{  TIME时间::time_日期()      }
    pub fn fn_time_to可读<T:Into<u128>>(毫秒ms:T)->String{ TIME时间::time_毫秒转日期(毫秒ms.into()).unwrap_or_default() }
    pub fn fn_数字to时间字符串(纳秒 :u128, 纳秒级别:bool)->String{ TIME时间::time_纳秒转时间字符串(纳秒, 纳秒级别)  }
    pub fn fn_时间to时间字符串(now:std::time::Instant,纳秒级别:bool)->String{  TIME时间::time_纳秒转时间字符串(now.elapsed().as_nanos(), 纳秒级别)  }
    /// - uuid v7
    pub fn fn_uuid随机()->uuid::Uuid{     TOOL工具::uuid随机()     }
    pub fn uuid随机(&self)->uuid::Uuid{   TOOL工具::uuid随机()     }
    pub fn timestamp(&self)->u128{       TIME时间::now_毫秒()     }
    pub fn time_日期(&self)->String{      TIME时间::time_日期()    }
    pub fn time_to可读<T:Into<u128>>(&self,毫秒ms:T)->String{ TIME时间::time_毫秒转日期(毫秒ms.into()).unwrap_or_default() }
}



#[derive(Clone, Serialize, Deserialize)]//可选特征
enum 信令表{
    路由心跳{      信息:String  },
    路由心跳_回复{  信息:String, 时间戳ns:u128  },
    
    延时器{ time_id:原子ID, 延时ms:u64 },
    延时器_回复,
    
    信令表_完成,
    退出进程{ code:i32, 退出原因:Option<String> },
}
impl信令特征!(信令表, 执行函数);

async fn 执行函数(信令路由:信令路由,id:信令ID,单个信令:信令表)->anyhow_Result<信令表>{
    match 单个信令{
        信令表::路由心跳{ 信息  }=>{  
            anyhow_Ok信令!(信令表::路由心跳_回复{ 信息:format!("💖心跳回答: {信息}"), 时间戳ns:TIME时间::now_纳秒() })   
        }
        信令表::延时器 { time_id,延时ms }=>{
            if time_id == 0{
                let 延时 = std::time::Duration::from_millis(延时ms as u64);
                Delay::new(延时).await;
                return anyhow_Ok信令!(信令表::延时器_回复);
            }
            else if let Some(定时器记录) = 全局_定时器表.get(&time_id){
                let 定时器 = 定时器记录.clone();
                信令路由.spawn异步任务(async move{  定时器.run运行单元().await });
            }
            return anyhow_Err信令!("无需回复  信令表::延时器");
        }
        _ =>{ return anyhow_Err信令!("信令不存在 {id}"); }
    }
}



static 全局_定时器表: Lazy<DashMap<原子ID, 定时器>> = Lazy::new(|| DashMap::new());  
// type 可存储_定时器异步函数 = Box<dyn Fn(u64) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static>  ;
type 可存储_定时器回调函数 = Box<dyn Fn(定时器) + Send>  ;
#[derive(Clone)]
pub struct 定时器{
    pub time_id:原子ID,   //  >0 开始
    pub 定时ms:u64,
    pub 循环:bool,
    pub 信令路由:信令路由,
    执行次数:Arc<Mutex<Cell<u64>>>,
    存在:Arc<Mutex<Cell<bool>>>,
    回调函数数组:Arc<Mutex<Vec<可存储_定时器回调函数>>>,
}
impl 定时器{
    fn new(信令路由:信令路由, 定时ms:u64,循环:bool, 回调函数:impl Fn(定时器) + Send  + 'static)->Self{
        let time_id = 原子ID对象::next().0;
        let 回调函数数组:Arc<Mutex<Vec<可存储_定时器回调函数>>> = Arc::new(Mutex::new(Vec::new()));
        回调函数数组.lock().push(Box::new(回调函数));
        let 定时器 = Self{
            信令路由, 定时ms, 循环,time_id, 
            执行次数:Arc::new(Mutex::new(Cell::new(0))),
            存在:Arc::new(Mutex::new(Cell::new(true))),
            回调函数数组,
        };
        全局_定时器表.entry(time_id).or_insert(定时器.clone());
        定时器.激活定时器();
        定时器
    }
  
    #[inline]
    fn 激活定时器(&self){ self.信令路由.task发布信令_单向(信令表::延时器{time_id:self.time_id,延时ms:self.定时ms});  }
    #[inline]
    fn set_执行次数加一(&self){ self.执行次数.lock().update(|x|x+1); }
    #[inline]
    async fn run运行单元(&self){
        if self.get_存在(){
            let 执行函数 = async{
                        let 延时 = std::time::Duration::from_millis(self.定时ms as u64);
                        Delay::new(延时).await;
                        if self.get_存在(){
                            self.set_执行次数加一();
                            for 函数记录 in self.回调函数数组.lock().iter(){
                                函数记录(self.clone());
                            }
                            if self.循环{ self.激活定时器(); }
                            else{ self.clear(); }            //非循环的只使用一次,执行一次
                        }
            };
            if let Err(错误) = AssertUnwindSafe(执行函数).catch_unwind().await {
                eprintln!("🍎🍎🍎rust【定时器catch_unwind】 {} => {错误:?}",self.time_id);   
            }
        }
    }
    
    pub fn 新增回调(&self, 回调函数:impl Fn(定时器) + Send  + 'static ){  self.回调函数数组.lock().push(Box::new(回调函数));  }
    pub fn get_存在(&self)->bool{ self.存在.lock().get()            }
    pub fn get_执行次数(&self)->u64{ self.执行次数.lock().get()  }
    pub fn start(&self){ self.存在.lock().set(true); self.激活定时器();   }
    ///暂停定时器
    pub fn stop(&self){ self.存在.lock().set(false); }
    /// 删除定时器
    pub fn clear(&self){ self.stop(); 全局_定时器表.remove(&self.time_id);  }
}








#[derive(Debug, Clone, Serialize, Deserialize,)]
pub struct rpc路由状态{
    pub 路由uuid:String,
    pub 服务端类型:String,
    pub 版本:String,
    pub 作者:String,
    pub 主页:String,
    // pub 打包时间:String,
    pub 文件所在:String,
    pub CPU数量:usize,
    pub 信令计数:信令ID,
    pub 等待的信令计数:usize,
    // pub 等待的信令数组:Vec<String>,
    pub rusty_pool状态:String,
    pub 启动时间:String,
    pub 时间戳:u128,
    pub log记录:Vec<String>,
}



#[cfg(test)]
mod  测试{
    use crate::prelude::*;

    // 🔥🔥🔥 js child_process.spawn 调用二进制,必须带有  process.env  ,否则报错🔥🔥🔥
    #[test]        
    fn 测试(){
        let 信令路由c1 =  信令路由::default();
        let 信令路由c2 =  信令路由c1.clone();
        let 结果 = 信令路由c1.pool异步任务_block阻塞完成(async move{
            let out1 = 信令路由c2.rpc信令路由_心跳("异步测试1").await;
            let out2 = 信令路由c2.rpc信令路由_心跳("同步测试2").block阻塞完成();
            (out1,out2)
        });
        println!("{结果:?}");
    }
}