syslog_rs/a_sync/
syslog_async_queue.rs1use std::marker::PhantomData;
23
24use async_trait::async_trait;
25use tokio::sync::mpsc::UnboundedSender;
26
27use crate::
28{
29 error::SyRes,
30 formatters::SyslogFormatter,
31 map_error_code,
32 socket::TapTypeData,
33 sy_sync_queue::{SyCmd, SyslogQueueAdapter},
34 Priority
35};
36
37use super::syslog_trait::AsyncSyslogApi;
38
39
40
41#[derive(Debug)]
43pub struct AsyncSyslogQueue<F: SyslogFormatter>
44{
45 tasks: UnboundedSender<SyCmd>,
47
48 _p: PhantomData<F>
49}
50
51unsafe impl<F: SyslogFormatter> Send for AsyncSyslogQueue<F> {}
52unsafe impl<F: SyslogFormatter> Sync for AsyncSyslogQueue<F> {}
53
54
55impl<F: SyslogFormatter> AsyncSyslogQueue<F>
56{
57 pub(crate) async
58 fn attachlog(queue_adapter: SyslogQueueAdapter) -> SyRes<Self>
59 {
60 return Ok(
61 Self
62 {
63 tasks: queue_adapter.consume()?,
64 _p: PhantomData::<F>,
65 }
66 );
67 }
68}
69
70
71#[async_trait]
72impl<F: SyslogFormatter> AsyncSyslogApi for AsyncSyslogQueue<F>
73{
74 async
75 fn connectlog(&mut self) -> SyRes<()>
76 {
77 let (sy_cmd, loopback) =
78 SyCmd::form_connectlog();
79
80 self
81 .tasks
82 .send(sy_cmd)
83 .map_err(|e| map_error_code!(SendError, "connectlog() error: '{}'", e))?;
84
85 return
86 loopback
87 .await
88 .map_err(|e|
89 map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
90 )?;
91 }
92
93 async
94 fn setlogmask(&self, logmask: i32) -> SyRes<i32>
95 {
96 let (sy_cmd, loopback) =
97 SyCmd::form_logmask(logmask);
98
99 self
100 .tasks
101 .send(sy_cmd)
102 .map_err(|e| map_error_code!(SendError, "closelog() error: '{}'", e))?;
103
104 return
105 loopback
106 .await
107 .map_err(|e|
108 map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
109 );
110 }
111
112 async
113 fn closelog(&self) -> SyRes<()>
114 {
115 let (sy_cmd, loopback) =
116 SyCmd::form_disconnectlog();
117
118 self
120 .tasks
121 .send(sy_cmd)
122 .map_err(|e|
123 map_error_code!(SendError, "closelog() error: '{}'", e)
124 )?;
125
126 return
127 loopback
128 .await
129 .map_err(|e|
130 map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
131 )?;
132 }
133
134 async
135 fn syslog(&self, pri: Priority, fmt: String)
136 {
137 let sy_cmd = SyCmd::form_syslog(pri, fmt);
142
143 let _ = self.tasks.send(sy_cmd);
144
145 return;
146 }
147
148 async
149 fn vsyslog(&self, pri: Priority, fmt: &str)
150 {
151 let sy_cmd = SyCmd::form_syslog(pri, fmt.to_string());
152
153 let _ = self.tasks.send(sy_cmd);
154
155 return;
156 }
157
158 async
159 fn change_identity(&self, ident: &str) -> SyRes<()>
160 {
161 let sy_cmd = SyCmd::form_change_ident(ident.to_string());
162
163 return
164 self
165 .tasks
166 .send(sy_cmd)
167 .map_err(|e|
168 map_error_code!(SendError, "change_identity() error: '{}'", e)
169 );
170 }
171
172 async
173 fn reconnect(&self) -> SyRes<()>
174 {
175 return
176 self
177 .tasks
178 .send(SyCmd::form_reconnect())
179 .map_err(|e|
180 map_error_code!(SendError, "signal_rotate_log() error: '{}'", e)
181 );
182 }
183
184 async
185 fn update_tap_data(&self, tap_data: TapTypeData) -> SyRes<()>
186 {
187 let (tap_data_cmd, loopback) = SyCmd::form_update_tap(tap_data);
188
189 self
190 .tasks
191 .send(tap_data_cmd)
192 .map_err(|e|
193 map_error_code!(SendError, "signal_rotate_log() error: '{}'", e)
194 )?;
195
196 return
197 loopback
198 .await
199 .map_err(|e|
200 map_error_code!(UnboundedChannelError, "channel error: '{}'", e)
201 )?;
202 }
203}
204