syslog_rs/a_sync/syslog_async_internal.rs
1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 *
4 * Copyright 2025 Aleksandr Morozov
5 *
6 * The syslog-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 * 1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *
11 * 2. The MIT License (MIT)
12 *
13 * 3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
14 */
15
16
17use std::fmt;
18use std::marker::PhantomData;
19
20use crate::a_sync::syslog_trait::AsyncSyslogApi;
21use crate::formatters::SyslogFormatter;
22use crate::map_error_os;
23use crate::portable;
24use crate::common::*;
25use crate::error::SyRes;
26use crate::socket::TapType;
27use crate::AsyncSyslogDestination;
28
29use super::async_socket::*;
30
31/// A trait which generalize some operations which are different from the
32/// various async providers i.e `smol` or `tokio` or external.
33#[allow(async_fn_in_trait)]
34pub trait AsyncSyslogInternalIO: fmt::Debug + Send + 'static
35{
36 /// Sends a message to syscons device. Usually this is a `/dev/console`
37 /// defined by crate::common::PATH_CONSOLE.
38 ///
39 /// # Arguemnts
40 ///
41 /// * `logstat` - a instance setup [LogStat].
42 ///
43 /// * `msg_payload` - a payload of the syslog message (without headers).
44 async fn send_to_syscons(logstat: LogStat, msg_payload: &str);
45
46 /// Sends a message to stderr device.
47 ///
48 /// # Arguemnts
49 ///
50 /// * `logstat` - a instance setup [LogStat].
51 ///
52 /// * `msg` - a payload of the syslog message (without headers).
53 async fn send_to_stderr(logstat: LogStat, msg: &str);
54
55 /// Sleep the current task for `us` microseconds.
56 ///
57 /// # Arguments
58 ///
59 /// * `us` - microseconds.
60 async fn sleep_micro(us: u64);
61}
62
63/// A trait which generalize the mutex from the std lib's of multiple async executors.
64/// The trait should be implemented on the mutex direclty.
65#[allow(async_fn_in_trait)]
66pub trait AsyncMutex<F: SyslogFormatter, D: AsyncSyslogDestination, DS: AsyncSyslogApi<F, D>>
67{
68 /// A mutex guard type.
69 type MutxGuard<'mux>: AsyncMutexGuard<'mux, F, D, DS> where Self: 'mux;
70
71 /// Creates new mutex instance for type which implements the [AsyncSyslogApi].
72 fn a_new(v: DS) -> Self;
73
74 /// Locks the mutex emmiting the `mutex guard`.
75 async fn a_lock<'mux>(&'mux self) -> Self::MutxGuard<'mux>;
76}
77
78/// A trait which generalize the mutex guarding emited by the mutex from various async executors.
79pub trait AsyncMutexGuard<'mux, F: SyslogFormatter, D: AsyncSyslogDestination, DS: AsyncSyslogApi<F, D>>
80{
81 /// Returns the reference to the inner type of the mutex guard.
82 fn guard(&self) -> &DS;
83
84 /// Returns the mutable reference to the inner type of the mutex guard.
85 fn guard_mut(&mut self) -> &mut DS;
86}
87
88/// Internal structure of the syslog async client.
89#[derive(Debug)]
90pub struct AsyncSyslogInternal<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO>
91{
92 /// An identification i.e program name, thread name
93 logtag: String,
94
95 /// A pid of the program.
96 logpid: String,
97
98 /// Defines how syslog operates
99 logstat: LogStat,
100
101 /// Holds the facility
102 facility: LogFacility,
103
104 /// A logmask
105 logmask: i32,
106
107 /// A stream
108 stream: D::SocketTap,
109
110 /// Phantom for [SyslogFormatter]
111 _p: PhantomData<F>,
112
113 /// Phantom for the [AsyncSyslogInternalIO] which provides writing to console and other IO.
114 _p2: PhantomData<IO>,
115}
116
117
118impl<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO> AsyncSyslogInternal<F, D, IO>
119{
120 /// Creates new instance of [SyslogInternal] which contains all
121 /// client syslog logic.
122 ///
123 /// # Arguments
124 ///
125 /// * `ident` - An optional argument which takes ref to str. If none, the
126 /// ident will be set later. Yje ident will be trucated to 48 UTF8
127 /// chars.
128 ///
129 /// * `logstat` - A [LogStat] flags separated by '|'
130 ///
131 /// * `facility` - A [LogFacility] flag
132 ///
133 /// * `req_tap` - A type of the syslog instance reuired. See [SyslogDestination].
134 ///
135 /// # Returns
136 ///
137 /// A [SyRes] is returned with following:
138 ///
139 /// * [Result::Ok] - with the instance.
140 ///
141 /// * [Result::Err] - with error description.
142 pub(crate)
143 fn new(
144 ident: Option<&str>,
145 logstat: LogStat,
146 facility: LogFacility,
147 req_tap: D
148 ) -> SyRes<Self>
149 {
150 // check if log_facility is invalid
151 let log_facility =
152 if facility.is_empty() == false &&
153 (facility & !LogMask::LOG_FACMASK).is_empty() == true
154 {
155 facility
156 }
157 else
158 {
159 // default facility code
160 LogFacility::LOG_USER
161 };
162
163 let logtag =
164 match ident
165 {
166 Some(r) =>
167 truncate_n(r, RFC_MAX_APP_NAME).to_string(),
168 None =>
169 truncate_n(
170 portable::p_getprogname()
171 .unwrap_or("".to_string())
172 .as_str(),
173 RFC_MAX_APP_NAME
174 )
175 .to_string()
176 };
177
178 let sock = D::SocketTap::new(req_tap)?;
179
180 return Ok(
181 Self
182 {
183 logtag: logtag,
184 logpid: portable::get_pid().to_string(),
185 logstat: logstat,
186 facility: log_facility,
187 logmask: 0xff,
188 stream: sock,
189 _p: PhantomData,
190 _p2: PhantomData,
191 }
192 );
193 }
194
195 #[inline]
196 pub(crate)
197 fn is_logmasked(&self, pri: i32) -> bool
198 {
199 if ((1 << (pri & LogMask::LOG_PRIMASK)) & self.logmask) == 0
200 {
201 return true;
202 }
203
204 return false;
205 }
206
207 /// Returns the type of the socket.
208 #[inline]
209 pub(crate)
210 fn get_taptype(&self) -> TapType
211 {
212 return self.stream.get_type();
213 }
214
215 /*
216 /// Returns the maximum msg size in bytes. (Full msg with headers.)
217 #[inline]
218 pub(crate)
219 fn get_max_msg_size(&self) -> usize
220 {
221 return self.stream.get_max_msg_size();
222 }
223 */
224
225 #[inline]
226 pub(crate)
227 fn set_logtag<L: AsRef<str>>(&mut self, logtag: L, update_pid: bool)
228 {
229 self.logtag =
230 truncate_n(logtag.as_ref(), RFC_MAX_APP_NAME).to_string();
231
232 if update_pid == true
233 {
234 self.logpid = portable::get_pid().to_string();
235 }
236
237 return;
238 }
239
240 /// Disconnects the unix stream from syslog.
241 #[inline]
242 pub(crate) async
243 fn disconnectlog(&mut self) -> SyRes<()>
244 {
245 return
246 self
247 .stream
248 .disconnectlog()
249 .await
250 .map_err(|e| map_error_os!(e, "can not disconnect log properly"));
251 }
252}
253
254impl<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO> AsyncSyslogApi<F, D>
255for AsyncSyslogInternal<F, D, IO>
256{
257 async
258 fn update_tap_data(&mut self, tap_data: D) -> SyRes<()>
259 {
260 let is_con = self.stream.is_connected();
261
262 if is_con == true
263 {
264 self
265 .stream
266 .disconnectlog()
267 .await
268 .map_err(|e|
269 map_error_os!(e, "update_tap_data() can not disconnect log properly")
270 )?;
271 }
272
273 self.stream.update_tap_data(tap_data);
274
275 if is_con == true
276 {
277 // replace with new instance
278 self.stream.connectlog().await?;
279 }
280
281 return Ok(());
282 }
283
284 #[inline]
285 fn change_identity(&mut self, ident: &str)
286 {
287 self.set_logtag(ident, true);
288 }
289
290 async
291 fn reconnect(&mut self) -> SyRes<()>
292 {
293 self.disconnectlog().await?;
294
295 self.connectlog().await?;
296
297 return Ok(());
298 }
299
300 async
301 fn closelog(&mut self) -> SyRes<()>
302 {
303 return self.disconnectlog().await;
304 }
305
306 fn set_logmask(&mut self, logmask: i32) -> i32
307 {
308 let oldmask = self.logmask;
309
310 if logmask != 0
311 {
312 self.logmask = logmask;
313 }
314
315 return oldmask;
316 }
317
318
319 /// Connects unix stream to the syslog and sets up the properties of
320 /// the unix stream.
321 #[inline]
322 async
323 fn connectlog(&mut self) -> SyRes<()>
324 {
325 return self.stream.connectlog().await;
326 }
327
328 /// An internal function which is called by the syslog or vsyslog.
329 async
330 fn vsyslog1(&mut self, mut pri: Priority, fmt: F)
331 {
332 // check for invalid bits
333 if let Err(e) = pri.check_invalid_bits()
334 {
335 IO::send_to_stderr(self.logstat, &e.to_string()).await;
336 }
337
338 /*match check_invalid_bits(&mut pri)
339 {
340 Ok(_) => {},
341 Err(_e) => self.vsyslog1(get_internal_log(), fmt).await
342 }*/
343
344 // check priority against setlogmask
345 if self.is_logmasked(pri.bits()) == true
346 {
347 return;
348 }
349
350 // set default facility if not specified in pri
351 if (pri.bits() & LOG_FACMASK) == 0
352 {
353 pri.set_facility(self.facility);
354 };
355
356 // set PID if needed
357 let msg_pid =
358 if self.logstat.intersects(LogStat::LOG_PID) == true
359 {
360 Some(self.logpid.as_str())
361 }
362 else
363 {
364 None
365 };
366
367 let mut msg_formatted =
368 fmt.vsyslog1_format(D::SocketTap::get_max_msg_size(), pri, &self.logtag, msg_pid);
369
370 // output to stderr if required
371 IO::send_to_stderr(self.logstat, msg_formatted.get_stderr_output()).await;
372
373 if self.stream.is_connected() == false
374 {
375 // open connection
376 match self.connectlog().await
377 {
378 Ok(_) => {},
379 Err(e) =>
380 {
381 IO::send_to_stderr(self.logstat, &e.into_inner() ).await;
382 return;
383 }
384 }
385 }
386
387 let fullmsg = msg_formatted.get_full_msg();
388
389
390 // There are two possible scenarios when send may fail:
391 // 1. syslog temporary unavailable
392 // 2. syslog out of buffer space
393 // If we are connected to priv socket then in case of 1 we reopen connection
394 // and retry once.
395 // If we are connected to unpriv then in case of 2 repeatedly retrying to send
396 // until syslog socket buffer space will be cleared
397
398 loop
399 {
400 match self.stream.send(fullmsg.as_bytes()).await
401 {
402 Ok(_) =>
403 return,
404 Err(err) =>
405 {
406 if self.get_taptype().is_network() == false
407 {
408 #[cfg(target_family = "unix")]
409 {
410 if let Some(nix::libc::ENOBUFS) = err.raw_os_error()
411 {
412 // scenario 2
413 if self.get_taptype().is_priv() == true
414 {
415 break;
416 }
417
418 IO::sleep_micro(1).await;
419 //sleep(Duration::from_micros(1)).await;
420 }
421 else
422 {
423 // scenario 1
424 let _ = self.disconnectlog().await;
425 match self.connectlog().await
426 {
427 Ok(_) => {},
428 Err(_e) => break,
429 }
430
431
432 }
433 }
434
435 #[cfg(target_family = "windows")]
436 {
437 let Ok(werr) = err.downcast::<windows::core::Error>()
438 else
439 {
440 IO::send_to_stderr(self.logstat, "error downcast failed").await;
441 break;
442 };
443
444 IO::send_to_stderr(self.logstat, &werr.message()).await;
445
446 break;
447 }
448
449 // if resend will fail then probably the scn 2 will take place
450 }
451 else
452 {
453 let _ = self.disconnectlog().await;
454 match self.connectlog().await
455 {
456 Ok(_) => {},
457 Err(_e) => break,
458 }
459 }
460 }
461 }
462 } // loop
463
464
465 // If program reached this point then transmission over socket failed.
466 // Try to output message to console
467
468 IO::send_to_syscons(self.logstat, msg_formatted.get_stderr_output()).await;
469 }
470}