syslog_rs/a_sync/syslog_async_internal.rs
1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 *
4 * Copyright 2025 Aleksandr Morozov
5 *
6 * The syslog-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 * 1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *
11 * 2. The MIT License (MIT)
12 *
13 * 3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
14 */
15
16
17use std::fmt;
18use std::marker::PhantomData;
19
20use crate::a_sync::syslog_trait::AsyncSyslogApi;
21use crate::formatters::SyslogFormatter;
22use crate::map_error_os;
23use crate::portable;
24use crate::common::*;
25use crate::error::SyRes;
26use crate::socket::TapType;
27use crate::AsyncSyslogDestination;
28
29use super::async_socket::*;
30
31/// A trait which generalize some operations which are different from the
32/// various async providers i.e `smol` or `tokio` or external.
33#[allow(async_fn_in_trait)]
34pub trait AsyncSyslogInternalIO: fmt::Debug + Send + 'static
35{
36 /// Sends a message to syscons device. Usually this is a `/dev/console`
37 /// defined by crate::common::PATH_CONSOLE.
38 ///
39 /// # Arguemnts
40 ///
41 /// * `logstat` - a instance setup [LogStat].
42 ///
43 /// * `msg_payload` - a payload of the syslog message (without headers).
44 async fn send_to_syscons(logstat: LogStat, msg_payload: &str);
45
46 /// Sends a message to stderr device.
47 ///
48 /// # Arguemnts
49 ///
50 /// * `logstat` - a instance setup [LogStat].
51 ///
52 /// * `msg` - a payload of the syslog message (without headers).
53 async fn send_to_stderr(logstat: LogStat, msg: &str);
54
55 /// Sleep the current task for `us` microseconds.
56 ///
57 /// # Arguments
58 ///
59 /// * `us` - microseconds.
60 async fn sleep_micro(us: u64);
61}
62
63/// A trait which generalize the mutex from the std lib's of multiple async executors.
64/// The trait should be implemented on the mutex direclty.
65#[allow(async_fn_in_trait)]
66pub trait AsyncMutex<F: SyslogFormatter, D: AsyncSyslogDestination, DS: AsyncSyslogApi<F, D>>
67{
68 /// A mutex guard type.
69 type MutxGuard<'mux>: AsyncMutexGuard<'mux, F, D, DS> where Self: 'mux;
70
71 /// Creates new mutex instance for type which implements the [AsyncSyslogApi].
72 fn a_new(v: DS) -> Self;
73
74 /// Locks the mutex emmiting the `mutex guard`.
75 async fn a_lock<'mux>(&'mux self) -> Self::MutxGuard<'mux>;
76}
77
78/// A trait which generalize the mutex guarding emited by the mutex from various async executors.
79pub trait AsyncMutexGuard<'mux, F: SyslogFormatter, D: AsyncSyslogDestination, DS: AsyncSyslogApi<F, D>>
80{
81 /// Returns the reference to the inner type of the mutex guard.
82 fn guard(&self) -> &DS;
83
84 /// Returns the mutable reference to the inner type of the mutex guard.
85 fn guard_mut(&mut self) -> &mut DS;
86}
87
88/// Internal structure of the syslog async client.
89#[derive(Debug)]
90pub struct AsyncSyslogInternal<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO>
91{
92 /// An identification i.e program name, thread name
93 logtag: String,
94
95 /// A pid of the program.
96 logpid: String,
97
98 /// Defines how syslog operates
99 logstat: LogStat,
100
101 /// Holds the facility
102 facility: LogFacility,
103
104 /// A logmask
105 logmask: i32,
106
107 /// A stream
108 stream: D::SocketTap,
109
110 /// Phantom for [SyslogFormatter]
111 _p: PhantomData<F>,
112
113 /// Phantom for the [AsyncSyslogInternalIO] which provides writing to console and other IO.
114 _p2: PhantomData<IO>,
115}
116
117
118impl<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO> AsyncSyslogInternal<F, D, IO>
119{
120 /// Creates new instance of [SyslogInternal] which contains all
121 /// client syslog logic.
122 ///
123 /// # Arguments
124 ///
125 /// * `ident` - An optional argument which takes ref to str. If none, the
126 /// ident will be set later. Yje ident will be trucated to 48 UTF8
127 /// chars.
128 ///
129 /// * `logstat` - A [LogStat] flags separated by '|'
130 ///
131 /// * `facility` - A [LogFacility] flag
132 ///
133 /// * `req_tap` - A type of the syslog instance reuired. See [SyslogDestination].
134 ///
135 /// # Returns
136 ///
137 /// A [SyRes] is returned with following:
138 ///
139 /// * [Result::Ok] - with the instance.
140 ///
141 /// * [Result::Err] - with error description.
142 pub(crate)
143 fn new(
144 ident: Option<&str>,
145 logstat: LogStat,
146 facility: LogFacility,
147 req_tap: D
148 ) -> SyRes<Self>
149 {
150 // check if log_facility is invalid
151 let log_facility =
152 if facility.is_empty() == false &&
153 (facility & !LogMask::LOG_FACMASK).is_empty() == true
154 {
155 facility
156 }
157 else
158 {
159 // default facility code
160 LogFacility::LOG_USER
161 };
162
163 let logtag =
164 match ident
165 {
166 Some(r) =>
167 truncate_n(r, RFC_MAX_APP_NAME).to_string(),
168 None =>
169 truncate_n(
170 portable::p_getprogname()
171 .unwrap_or("".to_string())
172 .as_str(),
173 RFC_MAX_APP_NAME
174 )
175 .to_string()
176 };
177
178 let sock = D::SocketTap::new(req_tap)?;
179
180 return Ok(
181 Self
182 {
183 logtag: logtag,
184 logpid: portable::get_pid().to_string(),
185 logstat: logstat,
186 facility: log_facility,
187 logmask: 0xff,
188 stream: sock,
189 _p: PhantomData,
190 _p2: PhantomData,
191 }
192 );
193 }
194
195 #[inline]
196 pub(crate)
197 fn is_logmasked(&self, pri: Priority) -> bool
198 {
199 return ((1 << (pri & LogMask::LOG_PRIMASK)) & self.logmask) == 0;
200 }
201
202 /// Returns the type of the socket.
203 #[inline]
204 pub(crate)
205 fn get_taptype(&self) -> TapType
206 {
207 return self.stream.get_type();
208 }
209
210 /*
211 /// Returns the maximum msg size in bytes. (Full msg with headers.)
212 #[inline]
213 pub(crate)
214 fn get_max_msg_size(&self) -> usize
215 {
216 return self.stream.get_max_msg_size();
217 }
218 */
219
220 #[inline]
221 pub(crate)
222 fn set_logtag<L: AsRef<str>>(&mut self, logtag: L, update_pid: bool)
223 {
224 self.logtag =
225 truncate_n(logtag.as_ref(), RFC_MAX_APP_NAME).to_string();
226
227 if update_pid == true
228 {
229 self.logpid = portable::get_pid().to_string();
230 }
231
232 return;
233 }
234
235 /// Disconnects the unix stream from syslog.
236 #[inline]
237 pub(crate) async
238 fn disconnectlog(&mut self) -> SyRes<()>
239 {
240 return
241 self
242 .stream
243 .disconnectlog()
244 .await
245 .map_err(|e| map_error_os!(e, "can not disconnect log properly"));
246 }
247}
248
249impl<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO> AsyncSyslogApi<F, D>
250for AsyncSyslogInternal<F, D, IO>
251{
252 async
253 fn update_tap_data(&mut self, tap_data: D) -> SyRes<()>
254 {
255 let is_con = self.stream.is_connected();
256
257 if is_con == true
258 {
259 self
260 .stream
261 .disconnectlog()
262 .await
263 .map_err(|e|
264 map_error_os!(e, "update_tap_data() can not disconnect log properly")
265 )?;
266 }
267
268 self.stream.update_tap_data(tap_data);
269
270 if is_con == true
271 {
272 // replace with new instance
273 self.stream.connectlog().await?;
274 }
275
276 return Ok(());
277 }
278
279 #[inline]
280 fn change_identity(&mut self, ident: &str)
281 {
282 self.set_logtag(ident, true);
283 }
284
285 async
286 fn reconnect(&mut self) -> SyRes<()>
287 {
288 self.disconnectlog().await?;
289
290 self.connectlog().await?;
291
292 return Ok(());
293 }
294
295 async
296 fn closelog(&mut self) -> SyRes<()>
297 {
298 return self.disconnectlog().await;
299 }
300
301 fn set_logmask(&mut self, logmask: i32) -> i32
302 {
303 let oldmask = self.logmask;
304
305 if logmask != 0
306 {
307 self.logmask = logmask;
308 }
309
310 return oldmask;
311 }
312
313
314 /// Connects unix stream to the syslog and sets up the properties of
315 /// the unix stream.
316 #[inline]
317 async
318 fn connectlog(&mut self) -> SyRes<()>
319 {
320 return self.stream.connectlog().await;
321 }
322
323 /// An internal function which is called by the syslog or vsyslog.
324 async
325 fn vsyslog1(&mut self, pri: Priority, fmt: F)
326 {
327 /*
328 // check for invalid bits
329 if let Err(e) = pri.check_invalid_bits()
330 {
331 IO::send_to_stderr(self.logstat, &e.to_string()).await;
332 }
333 */
334
335 /*match check_invalid_bits(&mut pri)
336 {
337 Ok(_) => {},
338 Err(_e) => self.vsyslog1(get_internal_log(), fmt).await
339 }*/
340
341 // check priority against setlogmask
342 if self.is_logmasked(pri) == true
343 {
344 return;
345 }
346
347 // set default facility if not specified in pri
348 let pri_fac = SyslogMsgPriFac::set_facility(pri, self.facility);
349
350 // set PID if needed
351 let msg_pid =
352 if self.logstat.intersects(LogStat::LOG_PID) == true
353 {
354 Some(self.logpid.as_str())
355 }
356 else
357 {
358 None
359 };
360
361 let mut msg_formatted =
362 fmt.vsyslog1_format(D::SocketTap::get_max_msg_size(), pri_fac, &self.logtag, msg_pid);
363
364 // output to stderr if required
365 IO::send_to_stderr(self.logstat, msg_formatted.get_stderr_output()).await;
366
367 if self.stream.is_connected() == false
368 {
369 // open connection
370 match self.connectlog().await
371 {
372 Ok(_) => {},
373 Err(e) =>
374 {
375 IO::send_to_stderr(self.logstat, &e.into_inner() ).await;
376 return;
377 }
378 }
379 }
380
381 let fullmsg = msg_formatted.get_full_msg();
382
383
384 // There are two possible scenarios when send may fail:
385 // 1. syslog temporary unavailable
386 // 2. syslog out of buffer space
387 // If we are connected to priv socket then in case of 1 we reopen connection
388 // and retry once.
389 // If we are connected to unpriv then in case of 2 repeatedly retrying to send
390 // until syslog socket buffer space will be cleared
391
392 loop
393 {
394 match self.stream.send(fullmsg.as_bytes()).await
395 {
396 Ok(_) =>
397 return,
398 Err(err) =>
399 {
400 if self.get_taptype().is_network() == false
401 {
402 #[cfg(target_family = "unix")]
403 {
404 if let Some(nix::libc::ENOBUFS) = err.raw_os_error()
405 {
406 // scenario 2
407 if self.get_taptype().is_priv() == true
408 {
409 break;
410 }
411
412 IO::sleep_micro(1).await;
413 //sleep(Duration::from_micros(1)).await;
414 }
415 else
416 {
417 // scenario 1
418 let _ = self.disconnectlog().await;
419 match self.connectlog().await
420 {
421 Ok(_) => {},
422 Err(_e) => break,
423 }
424
425
426 }
427 }
428
429 #[cfg(target_family = "windows")]
430 {
431 let Ok(werr) = err.downcast::<windows::core::Error>()
432 else
433 {
434 IO::send_to_stderr(self.logstat, "error downcast failed").await;
435 break;
436 };
437
438 IO::send_to_stderr(self.logstat, &werr.message()).await;
439
440 break;
441 }
442
443 // if resend will fail then probably the scn 2 will take place
444 }
445 else
446 {
447 let _ = self.disconnectlog().await;
448 match self.connectlog().await
449 {
450 Ok(_) => {},
451 Err(_e) => break,
452 }
453 }
454 }
455 }
456 } // loop
457
458
459 // If program reached this point then transmission over socket failed.
460 // Try to output message to console
461
462 IO::send_to_syscons(self.logstat, msg_formatted.get_stderr_output()).await;
463 }
464}