rust_rsm/rsm/mod.rs
1#![allow(non_camel_case_types)]
2#![allow(non_snake_case)]
3#![allow(non_upper_case_globals)]
4
5//! # RSM
6//! RSM = Realtime Software Middleware
7//! Introduction
8//! ===
9//! Realtime system is defined as a system that can response the external request in certain deterministic time. To achieve this goal in generic computer systems, we must adopt a realtime shcedule policy on the software system, and keep from some time-consuming operation such as synchronous I/O operation, memory garbage collection and lock.
10//!
11//! RSM is a lightweight realtime middleware implementation written in rust, support event-driven, message oriented lock-free programming principle. in RSM, every software module is a **component**, which is normally a Finite State Machine, mainly proccess event loop. Each component can be instantiated to several tasks, and each task mapped to a dedicated **OS thread** and has its own message queue.
12//!
13//! Developer can set the task's schedule priority and their message queue length respectively,usually based on the service model and performance & latency requirements.
14//!
15//! RSM is suitable for the following applications:
16//! ----
17//! - network device control plane, e.g. routing protocol, service control
18//! - embedded system application
19//! - remote control system
20//! - realtime telemetry and instrumentation
21//!
22//! Programming
23//! ===
24//!
25//! Concept
26//! ---
27//!
28//! each RSM component must implement the **rsm::Runnable** trait and provides a task creation Callback function.
29//!
30//! the code in *main.rs* is a sample RSM application implementation.
31//!
32//! pub trait Runnable {
33//!
34//! fn on_init(&mut self,cid:&rsm_component_t);
35//!
36//! fn on_timer(&mut self,cid:&rsm_component_t,timer_id:rsm_timer_id_t,timer_data:usize);
37//!
38//! fn on_message(&mut self,cid:&rsm_component_t,msg_id:rsm_message_id_t,msg:&rsm_message_t);
39//!
40//! fn on_close(&mut self,cid:&rsm_component_t);
41//!
42//! }
43//!
44//! *type rsm_new_task=fn(cid:&rsm_component_t)->&'static mut dyn Runnable*
45//!
46//!
47//! Initialize the RSM
48//! ---
49//! using *rsm_init* function to init the rsm system, then the applicaition can register their components to RSM.
50//!
51//! rsm_init_cfg_t is the RSM's configuration file, which is in json format.
52//! rsm_init(conf:&config::rsm_init_cfg_t)->errcode::RESULT
53//!
54//! *pub fn registry_component(cid:u32,attrs:&component_attrs_t,callback:rsm_new_task)->errcode::RESULT*
55//!
56//! After the component registration is finished, the *start_rsm()* function should be called to running the system.
57//!
58//!Runtime
59//!---
60//!every running task can be identified uniquely by **rsm_component_t**
61//!
62//!task can send message to each other, with normal message or a high priority message
63//!*pub fn send_asyn_msg(dst:&rsm_component_t,msg:rsm_message_t)->errcode::RESUL*
64//!
65//! *pub fn send_asyn_priority_msg(dst:&rsm_component_t,msg:rsm_message_t)->errcode::RESULT*
66//!
67//! for the receiver side, the application use msg.decode::<T>(v) to restore the message to application defined type
68//!
69//! RSM also provides a timer service, application can set timer simply by calling **set_timer** function, once the timer is set and expired, rsm task will receive a on_timer event, which is defined in the Runnable trait.
70//!
71//! *pub fn set_timer(dur_msec:u64,loop_count:u64,timer_data:usize)->Option<rsm_timer_id_t>*
72//! *pub fn kill_timer_by_id(timer_id:rsm_timer_id_t)->errcode::RESULT*
73//!
74//! Diagnostic
75//! ===
76//! Developer and user can use rest api get running status and statistics
77//!
78//! Built in api
79//! ---
80//! help,*curl http://127.0.0.1:12000/rsm/help*
81//! get task running status, *curl http://127.0.0.1:12000/rsm/task?1:2*
82//! get component configuration,*curl http://127.0.0.1:12000/rsm/component?1*
83//!
84//! Application defined OAM API
85//! ---
86//! application Module must implement *OamReqCallBack* function, and invoke *RegisterOamModule* to register self
87//! *OamReqCallBack=fn(op:E_RSM_OAM_OP,url:&String,param:&String)->oam_cmd_resp_t*
88//!
89//! register a module callback, urls is a list of rest api url, the prefix /rsm and id following a "?" are not included
90//! *RegisterOamModule(urls:&[String], callback:OamReqCallBack)*
91//!
92//! Other service& lib function
93//! ===
94//! xlog service
95//! ---
96//! xlog service is based on client/server architecture, the client side simple send log message to the server which responsible for log file manipulation, keeping from write disk under the application's context, which is very important for the realtime application.
97//!
98//! *let log = rsm::new_xlog(module_name:&str)->xlog::xlogger_t;*
99//!
100//! *log.Errorf(postion, err, logDesc);*
101//!
102//! Other thread safe algorithm and data structure
103//! ---
104//! + spin_lock_t, Atomic operation based lock.
105//! + AtomicQueue, based on spin_lock
106//! + TsIdAllocator, thread safe Id allocator
107//! + bitmap
108//! + ethernet packet parser
109//! + Ip routing table
110//! + several other network function and object wrapper
111//!
112
113use crate::common::{self,errcode};
114use serde::{Deserialize, Serialize};
115use std::net::{IpAddr,SocketAddr};
116use serde_json;
117
118pub mod task;
119pub mod rsm_sched;
120pub mod rsm_timer;
121pub mod os_timer;
122pub mod socket;
123pub mod config;
124pub mod xlog;
125pub mod oam;
126
127const MAX_COMPONENT_NUM:usize = 256;
128pub const RSM_MODULE_NAME: &str = "rust_rsm";
129///RSM common Type definition
130pub type rsm_component_id_t = u32;
131pub type rsm_node_id_t = u32;
132///timer id type
133pub type rsm_timer_id_t = i32;
134
135pub type rsm_message_id_t = u32;
136
137///system & user cid scope definition
138pub const RSM_INVALID_CID:u32 = 0;
139///start of the CID reserved for system use
140pub const RSM_SYSTEM_CID_START:u32 = 1;
141///end of the CID reserved for system use
142pub const RSM_SYSTEM_CID_END:u32 = 1023;
143pub const RSM_USER_CID_START:u32 = 1024;
144///maximum instance number per cid
145pub const RSM_MAX_INST_PER_CID:usize=16;
146///allowed max message queue len
147pub const RSM_MAX_QUEUE_LEN:usize = 16384;
148///allowed max message length
149pub const RSM_MAX_MESSAGE_LEN:usize = 64000;
150
151pub const RSM_INVALID_TIMER_ID:i32=-1;
152/// describe the task schedule priority, the REALTIME Priority is mapped to Linux/Windows Realtime priority
153#[derive(Copy,Clone,PartialEq,Debug,Eq,Serialize)]
154pub enum E_RSM_TASK_PRIORITY {
155 THREAD_PRI_LOW = 0,
156 THREAD_PRI_NORMAL = 1,
157 THREAD_PRI_HIGH = 2,
158 THREAD_PRI_REALTIME = 3,
159 THREAD_PRI_REALTIME_HIGH = 4,
160 THREAD_PRI_REALTIME_HIGHEST = 5,
161}
162
163/// identifier for a software module running instance, include the software module unique id and an instance id
164/// in RSM, every software module running instance(component instance or task) is a Finite State Machine(FSM),
165/// which mapped to an OS native thread, process message event loop
166#[derive(Eq,PartialEq,Hash,Clone,Debug,Copy)]
167pub struct rsm_component_t {
168 cid:rsm_component_id_t,
169 node_id:rsm_node_id_t,
170 inst_id:usize,
171}
172
173impl rsm_component_t {
174 pub fn new(id:rsm_component_id_t,node_id:u32,inst_id:usize)->Self {
175 return Self {
176 cid:id,
177 node_id,
178 inst_id,
179 }
180 }
181 pub fn new_zero()->Self {
182 return Self { cid: 0, node_id: 0, inst_id: 0 }
183 }
184 pub fn get_cid(&self)->rsm_component_id_t {
185 self.cid
186 }
187 pub fn get_inst_id(&self)->usize {
188 self.inst_id
189 }
190}
191
192impl std::fmt::Display for rsm_component_t {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 write!(f, "(node_id={}, cid={},inst_id={})", self.node_id,self.cid, self.inst_id)
195 }
196}
197
198///socket event definition
199pub type SOCKET_EVENT=u32;
200///socket is readable
201pub const SOCK_EVENT_READ:SOCKET_EVENT= 1;
202///socket is writable
203pub const SOCK_EVENT_WRITE:SOCKET_EVENT= 1<<1;
204///new socket created, usually a tcp client connection
205pub const SOCK_EVENT_NEW:SOCKET_EVENT= 1<<2;
206//Error Connection
207pub const SOCK_EVENT_ERR:SOCKET_EVENT= 1<<3;
208///socket has been closed by remote peer
209pub const SOCK_EVENT_CLOSE:SOCKET_EVENT= 1<<4;
210
211#[derive(Clone,Debug,Serialize,Deserialize)]
212pub struct rsm_socket_event_t {
213 pub socket_id:i32,
214 pub sock_type:socket::SOCKET_TYPE,
215 pub event:SOCKET_EVENT,
216
217}
218///Task create callback function, which must return a valid object reference implement **Runnale** trait
219pub type rsm_new_task=fn(cid:&rsm_component_t)->&'static mut dyn Runnable;
220///Component must implement the Runnable Trait
221pub trait Runnable {
222 ///task init, called first when the task instance is created
223 fn on_init(&mut self,cid:&rsm_component_t);
224 /// called when a timer expiry event occured, timer_id indicate which timer fired
225 fn on_timer(&mut self,cid:&rsm_component_t,timer_id:rsm_timer_id_t,timer_data:usize);
226 /// socket event, if the task use rsm socket to send/recv message
227 /// upon recv this message, task should use correspondant Upd/Tcp/Raw Socket to recv packet, util no more packet
228 /// rsm automatically accept the tcp connection request from client, the notify the app, app can close the socket to reject the connection
229 fn on_socket_event(&mut self,cid:&rsm_component_t,event:rsm_socket_event_t);
230
231 ///an ordinary message received, the app should call msg.decode method to get original data structure
232 fn on_message(&mut self,cid:&rsm_component_t,msg_id:rsm_message_id_t,msg:&rsm_message_t);
233 //return true, if component has been initialized
234 fn is_inited(&self)->bool;
235 ///task has been destroyed, reserved for future use
236 fn on_close(&mut self,cid:&rsm_component_t);
237}
238
239/// describe the component attribute while register to the RSM
240#[derive(Eq,PartialEq,Clone,Serialize)]
241pub struct component_attrs_t {
242 pub cid:rsm_component_id_t,
243 pub name:String,
244 pub inst_num:usize, //实例数量
245 pub qlen:usize,
246 pub priority:E_RSM_TASK_PRIORITY,
247 pub need_init_ack:bool,
248}
249
250impl component_attrs_t {
251 pub fn new(cid:&rsm_component_id_t,name:&str,inst_num:usize,qlen:usize,prio:E_RSM_TASK_PRIORITY,need_init_ack:bool)->Self {
252 return Self {
253 cid:cid.clone(),
254 name:String::from(name),
255 inst_num:inst_num, //实例数量
256 qlen:qlen,
257 priority:prio,
258 need_init_ack:need_init_ack,
259 }
260 }
261
262}
263
264///begin of the rsm message id using by system
265pub const RSM_SYS_MESSAGE_ID_START:u32 = 1;
266///end of the rsm message id using by system
267pub const RSM_SYS_MESSAGE_ID_END:u32 = 8191;
268///user message ID start, application should use message id large than this value
269pub const RSM_USER_MESSAGE_ID_START:u32 = 8192;
270pub const RSM_INVALID_MESSAGE_ID:u32 = 0;
271
272///predefined rsm system message for inner use, application should not use these message id
273pub const RSM_MSG_ID_MASTER_POWER_ON:u32 = 1;
274pub const RSM_MSG_ID_SLAVE_POWER_ON:u32 = 2;
275pub const RSM_MSG_ID_POWER_ON_ACK:u32 = 3;
276pub const RSM_MSG_ID_POWER_OFF:u32 = 4;
277pub const RSM_MSG_ID_TIMER:u32 = 10;
278pub const RSM_MSG_ID_SOCKET:u32 = 12;
279
280///message object
281#[derive(Clone,Debug)]
282pub struct rsm_message_t {
283 msg_id:u32,
284 timer_id:rsm_timer_id_t,
285 timer_data:usize,
286 sender:rsm_component_t,
287 msg_body:String,
288}
289impl rsm_message_t {
290 pub fn new<'de,T>(msg_id:rsm_message_id_t,body:&T)->Option<rsm_message_t>
291 where T:Sized+Serialize+Deserialize<'de> {
292 let msg_body = match serde_json::to_string(body) {
293 Ok(s)=>s,
294 Err(_)=>return None,
295 };
296 let sender = match get_self_cid() {
297 None=>rsm_component_t::new_zero(),
298 Some(c)=>c,
299 };
300 let msg=Self {
301 msg_id:msg_id,
302 timer_id:0,
303 timer_data:0,
304 sender:sender,
305 msg_body:msg_body,
306 };
307 return Some(msg);
308 }
309
310 pub(crate) fn new_timer_msg(timer_id:rsm_timer_id_t,timer_data:usize)->Option<rsm_message_t> {
311 let sender = match get_self_cid() {
312 None=>rsm_component_t::new_zero(),
313 Some(c)=>c,
314 };
315 let msg=Self {
316 msg_id:RSM_MSG_ID_TIMER,
317 timer_id:timer_id,
318 timer_data:timer_data,
319 sender:sender,
320 msg_body:String::default(),
321 };
322 return Some(msg);
323 }
324 /// on the receiving side, using decode to restore the original data format
325 pub fn decode<'a,T>(&'a self)->Option<T>
326 where T:Deserialize<'a> {
327 match serde_json::from_slice::<T>(self.msg_body.as_bytes()) {
328 Ok(v)=>Some(v),
329 Err(_)=>None,
330 }
331 }
332}
333static mut gRsmConfig:Option<config::rsm_init_cfg_t>=None;
334///initialize rsm subsystem, which should be called before register any component
335pub fn rsm_init(conf:&config::rsm_init_cfg_t)->errcode::RESULT {
336 unsafe {
337 if gRsmConfig.is_some() {
338 return errcode::ERROR_ALREADY_EXIST
339 }
340 gRsmConfig=Some(conf.clone());
341 }
342 oam::init_oam(&conf.oam_server_addr, &conf.log_config.self_addr);
343 rsm_sched::init_scheduler(conf.max_component_num);
344 rsm_timer::init_timer();
345 //let mut log_conf = xlog::log_service_config_t::new_default();
346
347 xlog::xlog_server::InitLogService(&conf.log_config);
348 socket::socketpool::init_socket_pool();
349 errcode::RESULT_SUCCESS
350}
351
352///after application initialize RSM and register all their running component, then invoke start_rsm
353pub fn start_rsm() {
354 println!("Start RSM, current={}",common::format_datetime(&std::time::SystemTime::now()));
355 std::thread::spawn(|| rsm_timer::start_timer_thread());
356 std::thread::spawn(|| rsm_sched::run());
357
358}
359
360///Register a component to RSM, with the configuration is specified by attrs parameter
361/// callback is a TASK creation call back function, which is invoke by RSM before schedule the task instance
362pub fn registry_component(cid:u32,attrs:&component_attrs_t,callback:rsm_new_task)->errcode::RESULT {
363 return rsm_sched::registry_component(cid, attrs, callback)
364}
365
366/// get self component id
367pub fn get_self_cid()->Option<rsm_component_t>{
368 return rsm_sched::get_self_cid();
369}
370
371/// get the sender cid under the message receive context
372pub fn get_sender_cid()->Option<rsm_component_t>{
373 return rsm_sched::get_sender_cid()
374}
375
376///power_on or init_ack, to keep task start order, not implement yet
377pub fn power_on_ack() {
378 return rsm_sched::power_on_ack();
379}
380///send asyn message, normally put into the receiver's message queue
381pub fn send_asyn_msg(dst:&rsm_component_t,msg:rsm_message_t)->errcode::RESULT {
382 return rsm_sched::send_asyn_msg(dst, msg);
383}
384
385pub fn send_asyn_msg_ext<'de,T>(dst:&rsm_component_t,msg_id:u32,body:&T)->errcode::RESULT
386 where T:Sized+Serialize+Deserialize<'de> {
387 let msg=match rsm_message_t::new(msg_id, body) {
388 None=>return errcode::ERROR_ENCODE_MSG,
389 Some(m)=>m,
390 };
391 return rsm_sched::send_asyn_msg(dst, msg);
392}
393
394///send high priority asyn message, this type message is ensure delivery to the component before other normal message
395pub fn send_asyn_priority_msg(dst:&rsm_component_t,msg:rsm_message_t)->errcode::RESULT {
396 return rsm_sched::send_asyn_priority_msg(dst, msg);
397}
398
399///set a timer, loop for **loop_count** times every **dur_msec** milliseconds. if *loop_count* is 0, the timer will not stop util application kill the timer
400pub fn set_timer(dur_msec:u64,loop_count:u64,timer_data:usize)->Option<rsm_timer_id_t>{
401 return rsm_timer::set_timer(dur_msec, loop_count, timer_data);
402}
403
404/// stop the timer, given the timer_id returned by *set_timer* function
405pub fn kill_timer_by_id(timer_id:rsm_timer_id_t)->errcode::RESULT {
406 return rsm_timer::kill_timer_by_id(timer_id);
407}
408
409///create a xlog client instance, then using the instance to output logs
410pub fn new_xlog(module_name:&str)->xlog::xlogger_t {
411 let serv_addr = match unsafe {&gRsmConfig} {
412 None=>SocketAddr::new(IpAddr::from([127,0,0,1]),xlog::LOG_DEF_SERVICE_PORT),
413 Some(c)=>c.log_config.self_addr,
414 };
415 return xlog::xlogger::new_xlogger(module_name,
416 &IpAddr::from([127,0,0,1]), 0,
417 &serv_addr.ip(),serv_addr.port());
418}
419