1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
//! # RSM
//! RSM = Realtime Software Middleware
//! Introduction
//! ===
//! Realtime system is defined as a system that can response the external request in certain deterministic time. To achieve this goal in generic computer systems, we must adopt a realtime shcedule policy on the software system, and keep from some time-consuming operation such as synchronous I/O operation, memory garbage collection and lock.
//!
//! RSM is a lightweight realtime middleware implementation written in rust, support event-driven, message oriented lock-free programming principle. in RSM, every software module is a **component**, which is normally a Finite State Machine, mainly proccess event loop. Each component can be instantiated to several tasks, and each task mapped to a dedicated **OS thread** and has its own message queue.
//!
//! Developer can set the task's schedule priority and their message queue length respectively,usually based on the service model and performance & latency requirements.
//!
//! RSM is suitable for the following applications:
//! ----
//! - network device control plane, e.g. routing protocol, service control
//! - embedded system application
//! - remote control system
//! - realtime telemetry and instrumentation
//!
//! Programming
//! ===
//!
//! Concept
//! ---
//!
//! each RSM component must implement the **rsm::Runnable** trait and provides a task creation Callback function.
//!
//! the code in *main.rs* is a sample RSM application implementation.
//!
//! pub trait Runnable {
//!
//! fn on_init(&mut self,cid:&rsm_component_t);
//!
//! fn on_timer(&mut self,cid:&rsm_component_t,timer_id:rsm_timer_id_t,timer_data:usize);
//!
//! fn on_message(&mut self,cid:&rsm_component_t,msg_id:rsm_message_id_t,msg:&rsm_message_t);
//!
//! fn on_close(&mut self,cid:&rsm_component_t);
//!
//! }
//!
//! *type rsm_new_task=fn(cid:&rsm_component_t)->&'static mut dyn Runnable*
//!
//!
//! Initialize the RSM
//! ---
//! using *rsm_init* function to init the rsm system, then the applicaition can register their components to RSM.
//!
//! rsm_init_cfg_t is the RSM's configuration file, which is in json format.
//! rsm_init(conf:&config::rsm_init_cfg_t)->errcode::RESULT
//!
//! *pub fn registry_component(cid:u32,attrs:&component_attrs_t,callback:rsm_new_task)->errcode::RESULT*
//!
//! After the component registration is finished, the *start_rsm()* function should be called to running the system.
//!
//!Runtime
//!---
//!every running task can be identified uniquely by **rsm_component_t**
//!
//!task can send message to each other, with normal message or a high priority message
//!*pub fn send_asyn_msg(dst:&rsm_component_t,msg:rsm_message_t)->errcode::RESUL*
//!
//! *pub fn send_asyn_priority_msg(dst:&rsm_component_t,msg:rsm_message_t)->errcode::RESULT*
//!
//! for the receiver side, the application use msg.decode::<T>(v) to restore the message to application defined type
//!
//! RSM also provides a timer service, application can set timer simply by calling **set_timer** function, once the timer is set and expired, rsm task will receive a on_timer event, which is defined in the Runnable trait.
//!
//! *pub fn set_timer(dur_msec:u64,loop_count:u64,timer_data:usize)->Option<rsm_timer_id_t>*
//! *pub fn kill_timer_by_id(timer_id:rsm_timer_id_t)->errcode::RESULT*
//!
//! Diagnostic
//! ===
//! Developer and user can use rest api get running status and statistics
//!
//! Built in api
//! ---
//! help,*curl http://127.0.0.1:12000/rsm/help*
//! get task running status, *curl http://127.0.0.1:12000/rsm/task?1:2*
//! get component configuration,*curl http://127.0.0.1:12000/rsm/component?1*
//!
//! Application defined OAM API
//! ---
//! application Module must implement *OamReqCallBack* function, and invoke *RegisterOamModule* to register self
//! *OamReqCallBack=fn(op:E_RSM_OAM_OP,url:&String,param:&String)->oam_cmd_resp_t*
//!
//! register a module callback, urls is a list of rest api url, the prefix /rsm and id following a "?" are not included
//! *RegisterOamModule(urls:&[String], callback:OamReqCallBack)*
//!
//! Other service& lib function
//! ===
//! xlog service
//! ---
//! xlog service is based on client/server architecture, the client side simple send log message to the server which responsible for log file manipulation, keeping from write disk under the application's context, which is very important for the realtime application.
//!
//! *let log = rsm::new_xlog(module_name:&str)->xlog::xlogger_t;*
//!
//! *log.Errorf(postion, err, logDesc);*
//!
//! Other thread safe algorithm and data structure
//! ---
//! + spin_lock_t, Atomic operation based lock.
//! + AtomicQueue, based on spin_lock
//! + TsIdAllocator, thread safe Id allocator
//! + bitmap
//! + ethernet packet parser
//! + Ip routing table
//! + several other network function and object wrapper
//!
use crate::common::{self,errcode};
use serde::{Deserialize, Serialize};
use std::net::{IpAddr,SocketAddr};
use serde_json;
pub mod task;
pub mod rsm_sched;
pub mod rsm_timer;
pub mod os_timer;
pub mod socket;
pub mod config;
pub mod xlog;
pub mod oam;
const MAX_COMPONENT_NUM:usize = 256;
pub const RSM_MODULE_NAME: &str = "rust_rsm";
///RSM common Type definition
pub type rsm_component_id_t = u32;
pub type rsm_node_id_t = u32;
///timer id type
pub type rsm_timer_id_t = i32;
pub type rsm_message_id_t = u32;
///system & user cid scope definition
pub const RSM_INVALID_CID:u32 = 0;
///start of the CID reserved for system use
pub const RSM_SYSTEM_CID_START:u32 = 1;
///end of the CID reserved for system use
pub const RSM_SYSTEM_CID_END:u32 = 1023;
pub const RSM_USER_CID_START:u32 = 1024;
///maximum instance number per cid
pub const RSM_MAX_INST_PER_CID:usize=16;
///allowed max message queue len
pub const RSM_MAX_QUEUE_LEN:usize = 16384;
///allowed max message length
pub const RSM_MAX_MESSAGE_LEN:usize = 64000;
pub const RSM_INVALID_TIMER_ID:i32=-1;
/// describe the task schedule priority, the REALTIME Priority is mapped to Linux/Windows Realtime priority
#[derive(Copy,Clone,PartialEq,Debug,Eq,Serialize)]
pub enum E_RSM_TASK_PRIORITY {
THREAD_PRI_LOW = 0,
THREAD_PRI_NORMAL = 1,
THREAD_PRI_HIGH = 2,
THREAD_PRI_REALTIME = 3,
THREAD_PRI_REALTIME_HIGH = 4,
THREAD_PRI_REALTIME_HIGHEST = 5,
}
/// identifier for a software module running instance, include the software module unique id and an instance id
/// in RSM, every software module running instance(component instance or task) is a Finite State Machine(FSM),
/// which mapped to an OS native thread, process message event loop
#[derive(Eq,PartialEq,Hash,Clone,Debug,Copy)]
pub struct rsm_component_t {
cid:rsm_component_id_t,
node_id:rsm_node_id_t,
inst_id:usize,
}
impl rsm_component_t {
pub fn new(id:rsm_component_id_t,node_id:u32,inst_id:usize)->Self {
return Self {
cid:id,
node_id,
inst_id,
}
}
pub fn new_zero()->Self {
return Self { cid: 0, node_id: 0, inst_id: 0 }
}
pub fn get_cid(&self)->rsm_component_id_t {
self.cid
}
pub fn get_inst_id(&self)->usize {
self.inst_id
}
}
impl std::fmt::Display for rsm_component_t {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "(node_id={}, cid={},inst_id={})", self.node_id,self.cid, self.inst_id)
}
}
///socket event definition
pub type SOCKET_EVENT=u32;
///socket is readable
pub const SOCK_EVENT_READ:SOCKET_EVENT= 1;
///socket is writable
pub const SOCK_EVENT_WRITE:SOCKET_EVENT= 1<<1;
///new socket created, usually a tcp client connection
pub const SOCK_EVENT_NEW:SOCKET_EVENT= 1<<2;
//Error Connection
pub const SOCK_EVENT_ERR:SOCKET_EVENT= 1<<3;
///socket has been closed by remote peer
pub const SOCK_EVENT_CLOSE:SOCKET_EVENT= 1<<4;
#[derive(Clone,Debug,Serialize,Deserialize)]
pub struct rsm_socket_event_t {
pub socket_id:i32,
pub sock_type:socket::SOCKET_TYPE,
pub event:SOCKET_EVENT,
}
///Task create callback function, which must return a valid object reference implement **Runnale** trait
pub type rsm_new_task=fn(cid:&rsm_component_t)->&'static mut dyn Runnable;
///Component must implement the Runnable Trait
pub trait Runnable {
///task init, called first when the task instance is created
fn on_init(&mut self,cid:&rsm_component_t);
/// called when a timer expiry event occured, timer_id indicate which timer fired
fn on_timer(&mut self,cid:&rsm_component_t,timer_id:rsm_timer_id_t,timer_data:usize);
/// socket event, if the task use rsm socket to send/recv message
/// upon recv this message, task should use correspondant Upd/Tcp/Raw Socket to recv packet, util no more packet
/// rsm automatically accept the tcp connection request from client, the notify the app, app can close the socket to reject the connection
fn on_socket_event(&mut self,cid:&rsm_component_t,event:rsm_socket_event_t);
///an ordinary message received, the app should call msg.decode method to get original data structure
fn on_message(&mut self,cid:&rsm_component_t,msg_id:rsm_message_id_t,msg:&rsm_message_t);
//return true, if component has been initialized
fn is_inited(&self)->bool;
///task has been destroyed, reserved for future use
fn on_close(&mut self,cid:&rsm_component_t);
}
/// describe the component attribute while register to the RSM
#[derive(Eq,PartialEq,Clone,Serialize)]
pub struct component_attrs_t {
pub cid:rsm_component_id_t,
pub name:String,
pub inst_num:usize, //实例数量
pub qlen:usize,
pub priority:E_RSM_TASK_PRIORITY,
pub need_init_ack:bool,
}
impl component_attrs_t {
pub fn new(cid:&rsm_component_id_t,name:&str,inst_num:usize,qlen:usize,prio:E_RSM_TASK_PRIORITY,need_init_ack:bool)->Self {
return Self {
cid:cid.clone(),
name:String::from(name),
inst_num:inst_num, //实例数量
qlen:qlen,
priority:prio,
need_init_ack:need_init_ack,
}
}
}
///begin of the rsm message id using by system
pub const RSM_SYS_MESSAGE_ID_START:u32 = 1;
///end of the rsm message id using by system
pub const RSM_SYS_MESSAGE_ID_END:u32 = 8191;
///user message ID start, application should use message id large than this value
pub const RSM_USER_MESSAGE_ID_START:u32 = 8192;
pub const RSM_INVALID_MESSAGE_ID:u32 = 0;
///predefined rsm system message for inner use, application should not use these message id
pub const RSM_MSG_ID_MASTER_POWER_ON:u32 = 1;
pub const RSM_MSG_ID_SLAVE_POWER_ON:u32 = 2;
pub const RSM_MSG_ID_POWER_ON_ACK:u32 = 3;
pub const RSM_MSG_ID_POWER_OFF:u32 = 4;
pub const RSM_MSG_ID_TIMER:u32 = 10;
pub const RSM_MSG_ID_SOCKET:u32 = 12;
///message object
#[derive(Clone,Debug)]
pub struct rsm_message_t {
msg_id:u32,
timer_id:rsm_timer_id_t,
timer_data:usize,
sender:rsm_component_t,
msg_body:String,
}
impl rsm_message_t {
pub fn new<'de,T>(msg_id:rsm_message_id_t,body:&T)->Option<rsm_message_t>
where T:Sized+Serialize+Deserialize<'de> {
let msg_body = match serde_json::to_string(body) {
Ok(s)=>s,
Err(_)=>return None,
};
let sender = match get_self_cid() {
None=>rsm_component_t::new_zero(),
Some(c)=>c,
};
let msg=Self {
msg_id:msg_id,
timer_id:0,
timer_data:0,
sender:sender,
msg_body:msg_body,
};
return Some(msg);
}
pub(crate) fn new_timer_msg(timer_id:rsm_timer_id_t,timer_data:usize)->Option<rsm_message_t> {
let sender = match get_self_cid() {
None=>rsm_component_t::new_zero(),
Some(c)=>c,
};
let msg=Self {
msg_id:RSM_MSG_ID_TIMER,
timer_id:timer_id,
timer_data:timer_data,
sender:sender,
msg_body:String::default(),
};
return Some(msg);
}
/// on the receiving side, using decode to restore the original data format
pub fn decode<'a,T>(&'a self)->Option<T>
where T:Deserialize<'a> {
match serde_json::from_slice::<T>(self.msg_body.as_bytes()) {
Ok(v)=>Some(v),
Err(_)=>None,
}
}
}
static mut gRsmConfig:Option<config::rsm_init_cfg_t>=None;
///initialize rsm subsystem, which should be called before register any component
pub fn rsm_init(conf:&config::rsm_init_cfg_t)->errcode::RESULT {
unsafe {
if gRsmConfig.is_some() {
return errcode::ERROR_ALREADY_EXIST
}
gRsmConfig=Some(conf.clone());
}
oam::init_oam(&conf.oam_server_addr, &conf.log_config.self_addr);
rsm_sched::init_scheduler(conf.max_component_num);
rsm_timer::init_timer();
//let mut log_conf = xlog::log_service_config_t::new_default();
xlog::xlog_server::InitLogService(&conf.log_config);
socket::socketpool::init_socket_pool();
errcode::RESULT_SUCCESS
}
///after application initialize RSM and register all their running component, then invoke start_rsm
pub fn start_rsm() {
println!("Start RSM, current={}",common::format_datetime(&std::time::SystemTime::now()));
std::thread::spawn(|| rsm_timer::start_timer_thread());
std::thread::spawn(|| rsm_sched::run());
}
///Register a component to RSM, with the configuration is specified by attrs parameter
/// callback is a TASK creation call back function, which is invoke by RSM before schedule the task instance
pub fn registry_component(cid:u32,attrs:&component_attrs_t,callback:rsm_new_task)->errcode::RESULT {
return rsm_sched::registry_component(cid, attrs, callback)
}
/// get self component id
pub fn get_self_cid()->Option<rsm_component_t>{
return rsm_sched::get_self_cid();
}
/// get the sender cid under the message receive context
pub fn get_sender_cid()->Option<rsm_component_t>{
return rsm_sched::get_sender_cid()
}
///power_on or init_ack, to keep task start order, not implement yet
pub fn power_on_ack() {
return rsm_sched::power_on_ack();
}
///send asyn message, normally put into the receiver's message queue
pub fn send_asyn_msg(dst:&rsm_component_t,msg:rsm_message_t)->errcode::RESULT {
return rsm_sched::send_asyn_msg(dst, msg);
}
pub fn send_asyn_msg_ext<'de,T>(dst:&rsm_component_t,msg_id:u32,body:&T)->errcode::RESULT
where T:Sized+Serialize+Deserialize<'de> {
let msg=match rsm_message_t::new(msg_id, body) {
None=>return errcode::ERROR_ENCODE_MSG,
Some(m)=>m,
};
return rsm_sched::send_asyn_msg(dst, msg);
}
///send high priority asyn message, this type message is ensure delivery to the component before other normal message
pub fn send_asyn_priority_msg(dst:&rsm_component_t,msg:rsm_message_t)->errcode::RESULT {
return rsm_sched::send_asyn_priority_msg(dst, msg);
}
///set a timer, loop for **loop_count** times every **dur_msec** milliseconds. if *loop_count* is 0, the timer will not stop util application kill the timer
pub fn set_timer(dur_msec:u64,loop_count:u64,timer_data:usize)->Option<rsm_timer_id_t>{
return rsm_timer::set_timer(dur_msec, loop_count, timer_data);
}
/// stop the timer, given the timer_id returned by *set_timer* function
pub fn kill_timer_by_id(timer_id:rsm_timer_id_t)->errcode::RESULT {
return rsm_timer::kill_timer_by_id(timer_id);
}
///create a xlog client instance, then using the instance to output logs
pub fn new_xlog(module_name:&str)->xlog::xlogger_t {
let serv_addr = match unsafe {&gRsmConfig} {
None=>SocketAddr::new(IpAddr::from([127,0,0,1]),xlog::LOG_DEF_SERVICE_PORT),
Some(c)=>c.log_config.self_addr,
};
return xlog::xlogger::new_xlogger(module_name,
&IpAddr::from([127,0,0,1]), 0,
&serv_addr.ip(),serv_addr.port());
}