syslog_rs/a_sync/syslog_async_internal.rs
1/*-
2 * syslog-rs - a syslog client translated from libc to rust
3 *
4 * Copyright 2025 Aleksandr Morozov
5 *
6 * The syslog-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 * 1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *
11 * 2. The MIT License (MIT)
12 *
13 * 3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
14 */
15
16
17use std::borrow::Cow;
18use std::fmt;
19use std::marker::PhantomData;
20
21use nix::libc;
22
23use crate::a_sync::syslog_trait::AsyncSyslogApi;
24use crate::formatters::SyslogFormatter;
25use crate::map_error_os;
26use crate::portable;
27use crate::common::*;
28use crate::error::SyRes;
29use crate::socket::TapType;
30use crate::AsyncSyslogDestination;
31
32use super::async_socket::*;
33
34/// A trait which generalize some operations which are different from the
35/// various async providers i.e `smol` or `tokio` or external.
36#[allow(async_fn_in_trait)]
37pub trait AsyncSyslogInternalIO: fmt::Debug + Send + 'static
38{
39 /// Sends a message to syscons device. Usually this is a `/dev/console`
40 /// defined by crate::common::PATH_CONSOLE.
41 ///
42 /// # Arguemnts
43 ///
44 /// * `logstat` - a instance setup [LogStat].
45 ///
46 /// * `msg_payload` - a payload of the syslog message (without headers).
47 async fn send_to_syscons(logstat: LogStat, msg_payload: &str);
48
49 /// Sends a message to stderr device.
50 ///
51 /// # Arguemnts
52 ///
53 /// * `logstat` - a instance setup [LogStat].
54 ///
55 /// * `msg` - a payload of the syslog message (without headers).
56 async fn send_to_stderr(logstat: LogStat, msg: &str);
57
58 /// Sleep the current task for `us` microseconds.
59 ///
60 /// # Arguments
61 ///
62 /// * `us` - microseconds.
63 async fn sleep_micro(us: u64);
64}
65
66/// A trait which generalize the mutex from the std lib's of multiple async executors.
67/// The trait should be implemented on the mutex direclty.
68#[allow(async_fn_in_trait)]
69pub trait AsyncMutex<F: SyslogFormatter, D: AsyncSyslogDestination, DS: AsyncSyslogApi<F, D>>
70{
71 /// A mutex guard type.
72 type MutxGuard<'mux>: AsyncMutexGuard<'mux, F, D, DS> where Self: 'mux;
73
74 /// Creates new mutex instance for type which implements the [AsyncSyslogApi].
75 fn a_new(v: DS) -> Self;
76
77 /// Locks the mutex emmiting the `mutex guard`.
78 async fn a_lock<'mux>(&'mux self) -> Self::MutxGuard<'mux>;
79}
80
81/// A trait which generalize the mutex guarding emited by the mutex from various async executors.
82pub trait AsyncMutexGuard<'mux, F: SyslogFormatter, D: AsyncSyslogDestination, DS: AsyncSyslogApi<F, D>>
83{
84 /// Returns the reference to the inner type of the mutex guard.
85 fn guard(&self) -> &DS;
86
87 /// Returns the mutable reference to the inner type of the mutex guard.
88 fn guard_mut(&mut self) -> &mut DS;
89}
90
91/// Internal structure of the syslog async client.
92#[derive(Debug)]
93pub struct AsyncSyslogInternal<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO>
94{
95 /// An identification i.e program name, thread name
96 logtag: String,
97
98 /// A pid of the program.
99 logpid: String,
100
101 /// Defines how syslog operates
102 logstat: LogStat,
103
104 /// Holds the facility
105 facility: LogFacility,
106
107 /// A logmask
108 logmask: i32,
109
110 /// A stream
111 stream: D::SocketTap,
112
113 /// Phantom for [SyslogFormatter]
114 _p: PhantomData<F>,
115
116 /// Phantom for the [AsyncSyslogInternalIO] which provides writing to console and other IO.
117 _p2: PhantomData<IO>,
118}
119
120
121impl<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO> AsyncSyslogInternal<F, D, IO>
122{
123 /// Creates new instance of [SyslogInternal] which contains all
124 /// client syslog logic.
125 ///
126 /// # Arguments
127 ///
128 /// * `ident` - An optional argument which takes ref to str. If none, the
129 /// ident will be set later. Yje ident will be trucated to 48 UTF8
130 /// chars.
131 ///
132 /// * `logstat` - A [LogStat] flags separated by '|'
133 ///
134 /// * `facility` - A [LogFacility] flag
135 ///
136 /// * `req_tap` - A type of the syslog instance reuired. See [SyslogDestination].
137 ///
138 /// # Returns
139 ///
140 /// A [SyRes] is returned with following:
141 ///
142 /// * [Result::Ok] - with the instance.
143 ///
144 /// * [Result::Err] - with error description.
145 pub(crate)
146 fn new(
147 ident: Option<&str>,
148 logstat: LogStat,
149 facility: LogFacility,
150 req_tap: D
151 ) -> SyRes<Self>
152 {
153 // check if log_facility is invalid
154 let log_facility =
155 if facility.is_empty() == false &&
156 (facility & !LogMask::LOG_FACMASK).is_empty() == true
157 {
158 facility
159 }
160 else
161 {
162 // default facility code
163 LogFacility::LOG_USER
164 };
165
166 let logtag =
167 match ident
168 {
169 Some(r) =>
170 truncate_n(r, RFC_MAX_APP_NAME).to_string(),
171 None =>
172 truncate_n(
173 portable::p_getprogname()
174 .unwrap_or("".to_string())
175 .as_str(),
176 RFC_MAX_APP_NAME
177 )
178 .to_string()
179 };
180
181 let sock = D::SocketTap::new(req_tap)?;
182
183 return Ok(
184 Self
185 {
186 logtag: logtag,
187 logpid: portable::get_pid().to_string(),
188 logstat: logstat,
189 facility: log_facility,
190 logmask: 0xff,
191 stream: sock,
192 _p: PhantomData,
193 _p2: PhantomData,
194 }
195 );
196 }
197
198 #[inline]
199 pub(crate)
200 fn is_logmasked(&self, pri: i32) -> bool
201 {
202 if ((1 << (pri & LogMask::LOG_PRIMASK)) & self.logmask) == 0
203 {
204 return true;
205 }
206
207 return false;
208 }
209
210 /// Returns the type of the socket.
211 #[inline]
212 pub(crate)
213 fn get_taptype(&self) -> TapType
214 {
215 return self.stream.get_type();
216 }
217
218 /*
219 /// Returns the maximum msg size in bytes. (Full msg with headers.)
220 #[inline]
221 pub(crate)
222 fn get_max_msg_size(&self) -> usize
223 {
224 return self.stream.get_max_msg_size();
225 }
226 */
227
228 #[inline]
229 pub(crate)
230 fn set_logtag<L: AsRef<str>>(&mut self, logtag: L, update_pid: bool)
231 {
232 self.logtag =
233 truncate_n(logtag.as_ref(), RFC_MAX_APP_NAME).to_string();
234
235 if update_pid == true
236 {
237 self.logpid = portable::get_pid().to_string();
238 }
239
240 return;
241 }
242
243 /// Disconnects the unix stream from syslog.
244 #[inline]
245 pub(crate) async
246 fn disconnectlog(&mut self) -> SyRes<()>
247 {
248 return
249 self
250 .stream
251 .disconnectlog()
252 .await
253 .map_err(|e| map_error_os!(e, "can not disconnect log properly"));
254 }
255}
256
257impl<F: SyslogFormatter + Send, D: AsyncSyslogDestination, IO: AsyncSyslogInternalIO> AsyncSyslogApi<F, D>
258for AsyncSyslogInternal<F, D, IO>
259{
260 async
261 fn update_tap_data(&mut self, tap_data: D) -> SyRes<()>
262 {
263 let is_con = self.stream.is_connected();
264
265 if is_con == true
266 {
267 self
268 .stream
269 .disconnectlog()
270 .await
271 .map_err(|e|
272 map_error_os!(e, "update_tap_data() can not disconnect log properly")
273 )?;
274 }
275
276 self.stream.update_tap_data(tap_data);
277
278 if is_con == true
279 {
280 // replace with new instance
281 self.stream.connectlog().await?;
282 }
283
284 return Ok(());
285 }
286
287 #[inline]
288 fn change_identity(&mut self, ident: &str)
289 {
290 self.set_logtag(ident, true);
291 }
292
293 async
294 fn reconnect(&mut self) -> SyRes<()>
295 {
296 self.disconnectlog().await?;
297
298 self.connectlog().await?;
299
300 return Ok(());
301 }
302
303 async
304 fn closelog(&mut self) -> SyRes<()>
305 {
306 return self.disconnectlog().await;
307 }
308
309 fn set_logmask(&mut self, logmask: i32) -> i32
310 {
311 let oldmask = self.logmask;
312
313 if logmask != 0
314 {
315 self.logmask = logmask;
316 }
317
318 return oldmask;
319 }
320
321
322 /// Connects unix stream to the syslog and sets up the properties of
323 /// the unix stream.
324 #[inline]
325 async
326 fn connectlog(&mut self) -> SyRes<()>
327 {
328 return self.stream.connectlog().await;
329 }
330
331 /// An internal function which is called by the syslog or vsyslog.
332 async
333 fn vsyslog1(&mut self, mut pri: Priority, fmt: F)
334 {
335 // check for invalid bits
336 if let Err(e) = pri.check_invalid_bits()
337 {
338 IO::send_to_stderr(self.logstat, &e.to_string()).await;
339 }
340
341 /*match check_invalid_bits(&mut pri)
342 {
343 Ok(_) => {},
344 Err(_e) => self.vsyslog1(get_internal_log(), fmt).await
345 }*/
346
347 // check priority against setlogmask
348 if self.is_logmasked(pri.bits()) == true
349 {
350 return;
351 }
352
353 // set default facility if not specified in pri
354 if (pri.bits() & LOG_FACMASK) == 0
355 {
356 pri.set_facility(self.facility);
357 };
358
359 let mut msg_formatted =
360 fmt.vsyslog1_format(D::SocketTap::get_max_msg_size(), pri, &self.logtag, &self.logpid);
361
362 // output to stderr if required
363 IO::send_to_stderr(self.logstat, msg_formatted.get_stderr_output()).await;
364
365 if self.stream.is_connected() == false
366 {
367 // open connection
368 match self.connectlog().await
369 {
370 Ok(_) => {},
371 Err(e) =>
372 {
373 IO::send_to_stderr(self.logstat, &e.into_inner() ).await;
374 return;
375 }
376 }
377 }
378
379 let fullmsg = msg_formatted.get_full_msg();
380
381
382 // There are two possible scenarios when send may fail:
383 // 1. syslog temporary unavailable
384 // 2. syslog out of buffer space
385 // If we are connected to priv socket then in case of 1 we reopen connection
386 // and retry once.
387 // If we are connected to unpriv then in case of 2 repeatedly retrying to send
388 // until syslog socket buffer space will be cleared
389
390 loop
391 {
392 match self.stream.send(fullmsg.as_bytes()).await
393 {
394 Ok(_) => return,
395 Err(err) =>
396 {
397 if self.get_taptype().is_network() == false
398 {
399 if let Some(libc::ENOBUFS) = err.raw_os_error()
400 {
401 // scenario 2
402 if self.get_taptype().is_priv() == true
403 {
404 break;
405 }
406
407 IO::sleep_micro(1).await;
408 //sleep(Duration::from_micros(1)).await;
409 }
410 else
411 {
412 // scenario 1
413 let _ = self.disconnectlog().await;
414 match self.connectlog().await
415 {
416 Ok(_) => {},
417 Err(_e) => break,
418 }
419
420 // if resend will fail then probably the scn 2 will take place
421 }
422 }
423 else
424 {
425 let _ = self.disconnectlog().await;
426 match self.connectlog().await
427 {
428 Ok(_) => {},
429 Err(_e) => break,
430 }
431 }
432 }
433 }
434 } // loop
435
436
437 // If program reached this point then transmission over socket failed.
438 // Try to output message to console
439
440 IO::send_to_syscons(self.logstat, msg_formatted.get_stderr_output()).await;
441 }
442}