rust_rsm/rsm/
mod.rs

1#![allow(non_camel_case_types)]
2#![allow(non_snake_case)]
3#![allow(non_upper_case_globals)]
4
5//! # RSM
6//! RSM = Realtime Software Middleware
7//! Introduction
8//! ===
9//! Realtime system is defined as a system that can response the external request in certain deterministic time. To achieve this goal in generic computer systems, we must adopt a realtime shcedule policy on the software system, and keep from some time-consuming operation such as synchronous I/O operation, memory garbage collection and lock.
10//!
11//! RSM is a lightweight realtime middleware implementation written in rust, support event-driven, message oriented lock-free programming principle. in RSM, every software module is a **component**, which is normally a Finite State Machine, mainly proccess event loop. Each component can be instantiated to several tasks, and each task mapped to a dedicated **OS thread** and has its own message queue.
12//!
13//! Developer can set the task's schedule priority and their message queue length respectively,usually based on the service model and performance & latency requirements.
14//!
15//! RSM is suitable for the following applications:
16//! ----
17//! - network device control plane, e.g. routing protocol, service control
18//! - embedded system application
19//! - remote control system
20//! - realtime telemetry and instrumentation
21//!
22//! Programming
23//! ===
24//!
25//! Concept
26//! ---
27//!
28//! each RSM component must implement the **rsm::Runnable** trait and provides a task creation Callback function.
29//!
30//! the code in *main.rs* is a sample RSM application implementation.
31//!
32//! pub trait Runnable {
33//!
34//!    fn on_init(&mut self,cid:&rsm_component_t);
35//!
36//!    fn on_timer(&mut self,cid:&rsm_component_t,timer_id:rsm_timer_id_t,timer_data:usize);
37//!
38//!    fn on_message(&mut self,cid:&rsm_component_t,msg_id:rsm_message_id_t,msg:&rsm_message_t);
39//!
40//!    fn on_close(&mut self,cid:&rsm_component_t);
41//!
42//! }
43//!
44//! *type rsm_new_task=fn(cid:&rsm_component_t)->&'static mut dyn Runnable*
45//!
46//!
47//! Initialize the RSM
48//! ---
49//! using *rsm_init* function to init the rsm system, then the applicaition can register their components to RSM.
50//!
51//! rsm_init_cfg_t is the RSM's configuration file, which is in json format.
52//! rsm_init(conf:&config::rsm_init_cfg_t)->errcode::RESULT
53//!
54//! *pub fn registry_component(cid:u32,attrs:&component_attrs_t,callback:rsm_new_task)->errcode::RESULT*
55//!
56//! After the component registration is finished, the *start_rsm()* function should be called to running the system.
57//!
58//!Runtime
59//!---
60//!every running task can be identified uniquely by **rsm_component_t**
61//!
62//!task can send message to each other, with normal message or a high priority message
63//!*pub fn send_asyn_msg(dst:&rsm_component_t,msg:rsm_message_t)->errcode::RESUL*
64//!
65//! *pub fn send_asyn_priority_msg(dst:&rsm_component_t,msg:rsm_message_t)->errcode::RESULT*
66//!
67//! for the receiver side, the application use msg.decode::<T>(v) to restore the message to application defined type
68//!
69//! RSM also provides a timer service, application can set timer simply by calling **set_timer** function, once the timer is set and expired, rsm task will receive a on_timer event, which is defined in the Runnable trait.
70//!
71//! *pub fn set_timer(dur_msec:u64,loop_count:u64,timer_data:usize)->Option<rsm_timer_id_t>*
72//! *pub fn kill_timer_by_id(timer_id:rsm_timer_id_t)->errcode::RESULT*
73//!
74//! Diagnostic
75//! ===
76//! Developer and user can use rest api get running status and statistics
77//!
78//! Built in api
79//! ---
80//! help,*curl http://127.0.0.1:12000/rsm/help*
81//! get task running status, *curl http://127.0.0.1:12000/rsm/task?1:2*
82//! get component configuration,*curl http://127.0.0.1:12000/rsm/component?1*
83//! 
84//! Application defined OAM API
85//! ---
86//! application Module must implement *OamReqCallBack* function, and invoke *RegisterOamModule* to register self
87//! *OamReqCallBack=fn(op:E_RSM_OAM_OP,url:&String,param:&String)->oam_cmd_resp_t*
88//! 
89//! register a module callback, urls is a list of rest api url, the prefix /rsm and id following a "?" are not included
90//! *RegisterOamModule(urls:&[String], callback:OamReqCallBack)*
91//! 
92//! Other service& lib function
93//! ===
94//! xlog service
95//! ---
96//! xlog service is based on client/server architecture, the client side simple send log message to the server which responsible for log file manipulation, keeping from write disk under the application's context, which is very important for the realtime application.
97//! 
98//! *let log = rsm::new_xlog(module_name:&str)->xlog::xlogger_t;*
99//! 
100//! *log.Errorf(postion, err, logDesc);*
101//! 
102//! Other thread safe algorithm and data structure
103//! ---
104//! + spin_lock_t, Atomic operation based lock.
105//! + AtomicQueue, based on spin_lock
106//! + TsIdAllocator, thread safe Id allocator
107//! + bitmap
108//! + ethernet packet parser
109//! + Ip routing table
110//! + several other network function and object wrapper
111//! 
112 
113use crate::common::{self,errcode};
114use serde::{Deserialize, Serialize};
115use std::net::{IpAddr,SocketAddr};
116use serde_json;
117
118pub mod task;
119pub mod rsm_sched;
120pub mod rsm_timer;
121pub mod os_timer;
122pub mod socket;
123pub mod config;
124pub mod xlog;
125pub mod oam;
126
127const MAX_COMPONENT_NUM:usize = 256;
128pub const RSM_MODULE_NAME: &str = "rust_rsm";
129///RSM common Type definition
130pub type rsm_component_id_t = u32;
131pub type rsm_node_id_t = u32;
132///timer id type
133pub type rsm_timer_id_t = i32;
134
135pub type rsm_message_id_t = u32;
136
137///system & user cid scope definition
138pub const RSM_INVALID_CID:u32 = 0;
139///start of the CID reserved for system use
140pub const RSM_SYSTEM_CID_START:u32 = 1;
141///end of the CID reserved for system use
142pub const RSM_SYSTEM_CID_END:u32 = 1023;
143pub const RSM_USER_CID_START:u32 = 1024;
144///maximum instance number per cid
145pub const RSM_MAX_INST_PER_CID:usize=16;
146///allowed max message queue len
147pub const RSM_MAX_QUEUE_LEN:usize = 16384;
148///allowed max message length
149pub const RSM_MAX_MESSAGE_LEN:usize = 64000;
150
151pub const RSM_INVALID_TIMER_ID:i32=-1;
152/// describe the task schedule priority, the REALTIME Priority is mapped to Linux/Windows Realtime priority
153#[derive(Copy,Clone,PartialEq,Debug,Eq,Serialize)]
154pub enum E_RSM_TASK_PRIORITY {
155    THREAD_PRI_LOW = 0,
156	THREAD_PRI_NORMAL = 1,
157	THREAD_PRI_HIGH = 2,
158	THREAD_PRI_REALTIME = 3,
159	THREAD_PRI_REALTIME_HIGH = 4,
160    THREAD_PRI_REALTIME_HIGHEST = 5,
161}
162
163/// identifier for a software module running instance, include the software module unique id and an instance id
164/// in RSM, every software module running instance(component instance or task) is a Finite State Machine(FSM),
165///  which mapped to an OS native thread, process message event loop
166#[derive(Eq,PartialEq,Hash,Clone,Debug,Copy)]
167pub struct rsm_component_t {
168    cid:rsm_component_id_t,
169    node_id:rsm_node_id_t,
170    inst_id:usize,
171}
172
173impl rsm_component_t {
174    pub fn new(id:rsm_component_id_t,node_id:u32,inst_id:usize)->Self {
175        return Self {
176            cid:id,
177            node_id,
178            inst_id,
179        }
180    }
181    pub fn new_zero()->Self {
182        return Self { cid: 0, node_id: 0, inst_id: 0 }
183    }
184    pub fn get_cid(&self)->rsm_component_id_t {
185        self.cid
186    }
187    pub fn get_inst_id(&self)->usize {
188        self.inst_id
189    }
190}
191
192impl std::fmt::Display for rsm_component_t {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        write!(f, "(node_id={}, cid={},inst_id={})", self.node_id,self.cid, self.inst_id)
195    }
196}
197
198///socket event definition
199pub type SOCKET_EVENT=u32;
200///socket is readable
201pub const SOCK_EVENT_READ:SOCKET_EVENT= 1;
202///socket is writable
203pub const SOCK_EVENT_WRITE:SOCKET_EVENT= 1<<1;
204///new socket created, usually a tcp client connection
205pub const SOCK_EVENT_NEW:SOCKET_EVENT= 1<<2;
206//Error Connection
207pub const SOCK_EVENT_ERR:SOCKET_EVENT= 1<<3;
208///socket has been closed by remote peer
209pub const SOCK_EVENT_CLOSE:SOCKET_EVENT= 1<<4;
210
211#[derive(Clone,Debug,Serialize,Deserialize)]
212pub struct rsm_socket_event_t {
213    pub socket_id:i32,
214    pub sock_type:socket::SOCKET_TYPE,
215    pub event:SOCKET_EVENT,
216    
217}
218///Task create callback function, which must return a valid object reference implement **Runnale** trait
219pub type rsm_new_task=fn(cid:&rsm_component_t)->&'static mut dyn Runnable;
220///Component must implement the Runnable Trait
221pub trait Runnable {
222    ///task init, called first when the task instance is created
223    fn on_init(&mut self,cid:&rsm_component_t);
224    /// called when a timer expiry event occured, timer_id indicate which timer fired
225    fn on_timer(&mut self,cid:&rsm_component_t,timer_id:rsm_timer_id_t,timer_data:usize);
226    /// socket event, if the task use rsm socket to send/recv message
227    /// upon recv this message, task should use correspondant Upd/Tcp/Raw Socket to recv packet, util no more packet
228    /// rsm automatically accept the tcp connection request from client, the notify the app, app can close the socket to reject the connection
229    fn on_socket_event(&mut self,cid:&rsm_component_t,event:rsm_socket_event_t);
230
231    ///an ordinary message received, the app should call msg.decode method to get original data structure
232    fn on_message(&mut self,cid:&rsm_component_t,msg_id:rsm_message_id_t,msg:&rsm_message_t);
233    //return true, if component has been initialized
234    fn is_inited(&self)->bool;
235    ///task has been destroyed, reserved for future use
236    fn on_close(&mut self,cid:&rsm_component_t);
237}
238
239/// describe the component attribute while register to the RSM
240#[derive(Eq,PartialEq,Clone,Serialize)]
241pub struct component_attrs_t {
242    pub cid:rsm_component_id_t,    
243    pub name:String,
244    pub inst_num:usize, //实例数量
245    pub qlen:usize,
246    pub priority:E_RSM_TASK_PRIORITY,
247    pub need_init_ack:bool,
248}
249
250impl component_attrs_t {
251    pub fn new(cid:&rsm_component_id_t,name:&str,inst_num:usize,qlen:usize,prio:E_RSM_TASK_PRIORITY,need_init_ack:bool)->Self {
252        return Self {
253            cid:cid.clone(),    
254            name:String::from(name),
255            inst_num:inst_num, //实例数量
256            qlen:qlen,
257            priority:prio,
258            need_init_ack:need_init_ack,        
259        }
260    }
261    
262}
263
264///begin of the rsm message id using by system
265pub const RSM_SYS_MESSAGE_ID_START:u32 = 1;
266///end of the rsm message id using by system
267pub const RSM_SYS_MESSAGE_ID_END:u32 = 8191;
268///user message ID start, application should use message id large than this value
269pub const RSM_USER_MESSAGE_ID_START:u32 = 8192;
270pub const RSM_INVALID_MESSAGE_ID:u32 = 0;
271
272///predefined rsm system message for inner use, application should not use these message id
273pub const RSM_MSG_ID_MASTER_POWER_ON:u32 = 1;
274pub const RSM_MSG_ID_SLAVE_POWER_ON:u32 = 2;
275pub const RSM_MSG_ID_POWER_ON_ACK:u32 = 3;
276pub const RSM_MSG_ID_POWER_OFF:u32 = 4;
277pub const RSM_MSG_ID_TIMER:u32 = 10;
278pub const RSM_MSG_ID_SOCKET:u32 = 12;
279
280///message object
281#[derive(Clone,Debug)]
282pub struct rsm_message_t {
283    msg_id:u32,
284    timer_id:rsm_timer_id_t,
285    timer_data:usize,
286    sender:rsm_component_t,
287    msg_body:String,
288}
289impl rsm_message_t {
290    pub fn new<'de,T>(msg_id:rsm_message_id_t,body:&T)->Option<rsm_message_t> 
291    where T:Sized+Serialize+Deserialize<'de> {
292        let msg_body = match serde_json::to_string(body) {
293            Ok(s)=>s,
294            Err(_)=>return None,
295        };
296        let sender = match get_self_cid() {
297            None=>rsm_component_t::new_zero(),
298            Some(c)=>c,
299        };
300        let msg=Self {
301            msg_id:msg_id,
302            timer_id:0,
303            timer_data:0,
304            sender:sender,
305            msg_body:msg_body,
306        };
307        return Some(msg);
308    }
309
310    pub(crate) fn new_timer_msg(timer_id:rsm_timer_id_t,timer_data:usize)->Option<rsm_message_t> {
311        let sender = match get_self_cid() {
312            None=>rsm_component_t::new_zero(),
313            Some(c)=>c,
314        };
315        let msg=Self {
316            msg_id:RSM_MSG_ID_TIMER,
317            timer_id:timer_id,
318            timer_data:timer_data,
319            sender:sender,
320            msg_body:String::default(),
321        };
322        return Some(msg);
323    }
324    /// on the receiving side, using decode to restore the original data format
325    pub fn decode<'a,T>(&'a self)->Option<T>
326    where T:Deserialize<'a> {
327        match serde_json::from_slice::<T>(self.msg_body.as_bytes()) {
328            Ok(v)=>Some(v),
329            Err(_)=>None,
330        }
331    }
332}
333static mut gRsmConfig:Option<config::rsm_init_cfg_t>=None;
334///initialize rsm subsystem, which should be called before register any component
335pub fn rsm_init(conf:&config::rsm_init_cfg_t)->errcode::RESULT {
336    unsafe {
337    if gRsmConfig.is_some() {
338        return errcode::ERROR_ALREADY_EXIST
339    }
340    gRsmConfig=Some(conf.clone());
341    }
342    oam::init_oam(&conf.oam_server_addr, &conf.log_config.self_addr);
343    rsm_sched::init_scheduler(conf.max_component_num);
344    rsm_timer::init_timer();
345    //let mut log_conf = xlog::log_service_config_t::new_default();
346    
347    xlog::xlog_server::InitLogService(&conf.log_config);
348    socket::socketpool::init_socket_pool();
349    errcode::RESULT_SUCCESS
350}
351
352///after application initialize RSM and register all their running component, then invoke start_rsm
353pub fn start_rsm() {
354    println!("Start RSM, current={}",common::format_datetime(&std::time::SystemTime::now()));
355    std::thread::spawn(|| rsm_timer::start_timer_thread());
356    std::thread::spawn(|| rsm_sched::run());
357    
358}
359
360///Register a component to RSM, with the configuration is specified by attrs parameter
361/// callback is a TASK creation call back function, which is invoke by RSM before schedule the task instance
362pub fn registry_component(cid:u32,attrs:&component_attrs_t,callback:rsm_new_task)->errcode::RESULT {
363    return rsm_sched::registry_component(cid, attrs, callback)
364}
365
366/// get self component id
367pub fn get_self_cid()->Option<rsm_component_t>{
368    return rsm_sched::get_self_cid();
369}
370
371/// get the sender cid under the message receive context
372pub fn get_sender_cid()->Option<rsm_component_t>{
373    return rsm_sched::get_sender_cid()
374}
375
376///power_on or init_ack, to keep task start order, not implement yet
377pub fn power_on_ack() {
378    return rsm_sched::power_on_ack();
379}
380///send asyn message, normally put into the receiver's message queue
381pub fn send_asyn_msg(dst:&rsm_component_t,msg:rsm_message_t)->errcode::RESULT {
382    return rsm_sched::send_asyn_msg(dst, msg);
383}
384
385pub fn send_asyn_msg_ext<'de,T>(dst:&rsm_component_t,msg_id:u32,body:&T)->errcode::RESULT
386    where T:Sized+Serialize+Deserialize<'de> {
387    let msg=match rsm_message_t::new(msg_id, body) {
388        None=>return errcode::ERROR_ENCODE_MSG,
389        Some(m)=>m,
390    };
391    return rsm_sched::send_asyn_msg(dst, msg);
392}
393
394///send high priority asyn message, this type message is ensure delivery to the component before other normal message
395pub fn send_asyn_priority_msg(dst:&rsm_component_t,msg:rsm_message_t)->errcode::RESULT {
396    return rsm_sched::send_asyn_priority_msg(dst, msg);
397}
398
399///set a timer, loop for **loop_count** times every **dur_msec** milliseconds. if *loop_count* is 0, the timer will not stop util application kill the timer
400pub fn set_timer(dur_msec:u64,loop_count:u64,timer_data:usize)->Option<rsm_timer_id_t>{
401    return rsm_timer::set_timer(dur_msec, loop_count, timer_data);
402}
403
404/// stop the timer, given the timer_id returned by *set_timer* function
405pub fn kill_timer_by_id(timer_id:rsm_timer_id_t)->errcode::RESULT {
406    return rsm_timer::kill_timer_by_id(timer_id);
407}
408
409///create a xlog client instance, then using the instance to output logs
410pub fn new_xlog(module_name:&str)->xlog::xlogger_t {
411    let serv_addr = match unsafe {&gRsmConfig} {
412        None=>SocketAddr::new(IpAddr::from([127,0,0,1]),xlog::LOG_DEF_SERVICE_PORT),
413        Some(c)=>c.log_config.self_addr,
414    };
415    return xlog::xlogger::new_xlogger(module_name, 
416        &IpAddr::from([127,0,0,1]), 0, 
417        &serv_addr.ip(),serv_addr.port());
418}
419