1#![allow(non_snake_case)]
2#[macro_use]
23extern crate log;
24extern crate libc;
25extern crate num_cpus;
26extern crate threadpool;
27
28use std::error;
29use std::ffi::CStr;
30use std::ffi::CString;
31use std::fmt;
32use std::os::raw::{c_int, c_ulonglong};
33use std::result;
34use std::sync::mpsc;
35use std::sync::Arc;
36use std::sync::Mutex;
37
38use threadpool::ThreadPool;
39
40thread_local!(
41 static POOL: ThreadPool = ThreadPool::new(num_cpus::get())
42);
43
44pub(crate) mod ffi;
45pub(crate) mod mac;
46
47use ffi::catClientDestroy;
48use ffi::catClientInitWithConfig;
49use ffi::catVersion;
50use ffi::newTransaction;
51use ffi::CatClientConfig;
52use ffi::DEFAULT_CCAT_CONFIG;
53
54#[derive(Debug, Clone)]
55pub enum CatError {
56 CatClientInitError,
57}
58
59impl error::Error for CatError {
60 fn description(&self) -> &str {
61 "cat client init failed!"
62 }
63}
64
65impl fmt::Display for CatError {
66 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
67 match *self {
68 CatError::CatClientInitError => write!(f, "CatClientInitError"),
69 }
70 }
71}
72
73type Result<T> = result::Result<T, CatError>;
74
75pub struct CatClient {
77 appkey: String,
79 config: CatClientConfig,
81}
82
83impl CatClient {
84 pub fn new<T: ToString>(appkey: T) -> Self {
91 CatClient {
92 appkey: appkey.to_string(),
93 config: unsafe { DEFAULT_CCAT_CONFIG },
94 }
95 }
96
97 pub fn config(&mut self, config: &mut CatClientConfig) -> &mut Self {
99 self.config = *config;
100 self
101 }
102
103 pub fn init(&mut self) -> Result<&mut Self> {
105 let rc = unsafe {
106 catClientInitWithConfig(
107 CString::new(self.appkey.clone()).unwrap().as_ptr(),
108 &mut self.config,
109 )
110 };
111
112 if rc == 0 {
113 error!("{}", CatError::CatClientInitError);
114 Err(CatError::CatClientInitError)
115 } else {
116 Ok(self)
117 }
118 }
119
120 pub fn destroy(&self) {
122 warn!("cat client is being destroyed!");
123 unsafe { catClientDestroy() };
124 }
125
126 pub fn version(&self) -> &str {
128 unsafe { CStr::from_ptr(catVersion()).to_str().unwrap() }
129 }
130}
131
132pub enum CatMessage {
133 LogEvent(String, String, String, String),
134 Transaction(String),
135 CompleteThis,
136}
137
138pub struct CatTransaction {
139 sender: mpsc::Sender<CatMessage>,
140 open: Arc<Mutex<bool>>,
141}
142
143impl CatTransaction {
144 pub fn new<T: ToString>(_type: T, _name: T) -> Self {
145 let (sender, receiver) = mpsc::channel::<CatMessage>();
146 let _type = _type.to_string();
147 let _name = _name.to_string();
148 let _open = Arc::new(Mutex::new(true));
149 let _open_keep = _open.clone();
150 POOL.with(|pool| {
151 pool.execute(move || {
152 debug!("create a new transaction: {} / {}", _type, _name);
153 let tr = unsafe { newTransaction(c!(_type.clone()), c!(_name)) };
154
155 if tr.is_null() {
156 error!("create transaction failed!");
157 panic!("create transaction failed!")
158 } else {
159 'trans: loop {
161 match receiver.recv() {
162 Ok(message) => {
163 match message {
164 CatMessage::Transaction(_name) => {}
166
167 CatMessage::LogEvent(type_, name, status, data) => {
168 logEvent(type_, name, status, data)
169 }
170
171 CatMessage::CompleteThis => {
172 break 'trans;
173 }
174 }
175 }
176 Err(err) => {
177 error!("receive job failed, err: {}", err);
178 break 'trans;
179 }
180 }
181 }
182
183 let _open_guard = _open.clone();
184 let mut v = _open_guard.try_lock().unwrap();
185 *v = false;
186
187 if let Some(complete) = unsafe { (*tr).complete } {
188 debug!("complete this transaction");
189 unsafe {
190 complete(tr);
191 };
192 } else {
193 error!("transaction's complete method is missing");
194 }
195 }
196 });
197 });
198 CatTransaction {
199 sender,
200 open: _open_keep,
201 }
202 }
203
204 pub fn complete(&mut self) {
205 let _open_guard = self.open.clone();
206 if *_open_guard.try_lock().unwrap() {
207 self.sender
208 .send(CatMessage::CompleteThis)
209 .map_err(|e| {
210 error!("complete transaction error: {}", e);
211 })
212 .unwrap()
213 } else {
214 warn!("complete a closed transaction");
215 }
216 }
217
218 pub fn log<T: ToString>(&mut self, type_: T, name: T, status: T, data: T) {
219 let _open_guard = self.open.clone();
220 if *_open_guard.try_lock().unwrap() {
221 match self
222 .sender
223 .send(CatMessage::LogEvent(
224 type_.to_string(),
225 name.to_string(),
226 status.to_string(),
227 data.to_string(),
228 ))
229 .map_err(|e| {
230 error!("log event error: {}", e);
231 }) {
232 Ok(_) => {}
233 Err(e) => error!("log error: {:?}", e),
234 }
235 } else {
236 warn!("log event on a closed transaction");
237 }
238 }
239}
240
241pub fn logEvent<S: ToString>(type_: S, name_: S, status: S, data: S) {
256 unsafe {
257 ffi::logEvent(
258 c!(type_.to_string()),
259 c!(name_.to_string()),
260 c!(status.to_string()),
261 c!(data.to_string()),
262 )
263 }
264}
265
266pub fn newHeartBeat<S: ToString>(_type: S, _name: S) {
267 info!(
268 "start a new heart beat: {} {}",
269 _type.to_string(),
270 _name.to_string(),
271 );
272 unsafe {
273 ffi::newHeartBeat(c!(_type.to_string()), c!(_name.to_string()));
274 }
275}
276
277pub fn logMetricForCount<S: ToString>(name: S, quantity: i32) {
282 info!("logMetricForCount: {} {}", name.to_string(), quantity);
283
284 unsafe {
285 ffi::logMetricForCount(c!(name.to_string()), quantity as c_int);
286 }
287}
288
289pub fn logMetricForDuration<S: ToString>(name: S, duration: u64) {
294 info!("logMetricForDuration: {} {}", name.to_string(), duration);
295 unsafe {
296 ffi::logMetricForDuration(c!(name.to_string()), duration as c_ulonglong);
297 }
298}