1#[cfg(feature = "reconnect_and_persistence")]
2use crate::reconnect_and_persistence::{
3 reconnect_and_persistence, TLReconnectAndPersistenceSetting,
4};
5use bytes::Bytes;
6use chrono::Utc;
7use flume::Receiver;
8use hyper::Uri;
9use serde::{Deserialize, Serialize};
10use std::future::Future;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite};
14use tokio::task::yield_now;
15use tokio::time::Instant;
16use tonic::codec::CompressionEncoding;
17use tonic::codegen::{Service, StdError};
18use tonic::transport::{Channel, Endpoint, Error};
19use tonic::Status;
20use tracing::instrument::{WithDispatch, WithSubscriber};
21use tracing::subscriber::NoSubscriber;
22use tracing_core::Dispatch;
23use tracing_lv_core::proto::tracing_service_client::TracingServiceClient;
24use tracing_lv_core::proto::{record_param, AppStop, PingParam, RecordParam};
25use tracing_lv_core::{MsgReceiverSubscriber, TLAppInfoExt};
26use tracing_lv_core::{TLAppInfo, TLLayer, TracingLiveMsgSubscriber};
27use tracing_subscriber::layer::{Layered, SubscriberExt};
28use tracing_subscriber::registry::LookupSpan;
29use uuid::Uuid;
30
31pub struct NoSubscriberService<T>(T);
32pub struct NoSubscriberExecutor;
33
34impl<F> hyper::rt::Executor<F> for NoSubscriberExecutor
35where
36 F: Future + Send + 'static,
37 F::Output: Send + 'static,
38{
39 fn execute(&self, fut: F) {
40 tokio::spawn(fut.with_subscriber(NoSubscriber::new()));
41 }
42}
43
44impl<T, P> Service<P> for NoSubscriberService<T>
45where
46 T: Service<P>,
47{
48 type Response = T::Response;
49 type Error = T::Error;
50 type Future = WithDispatch<T::Future>;
51
52 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
53 let _guard = tracing::subscriber::set_default(NoSubscriber::new());
54 self.0.poll_ready(cx)
55 }
56
57 fn call(&mut self, request: P) -> Self::Future {
58 let _guard = tracing::subscriber::set_default(NoSubscriber::new());
59 self.0.call(request).with_subscriber(NoSubscriber::new())
60 }
61}
62
63pub struct TLGuard {
64 msg_sender: flume::Sender<RecordParam>,
65 is_normal_drop: bool,
66}
67
68impl TLGuard {
69 pub fn normal_stop(&mut self) {
70 self.is_normal_drop = true;
71
72 let _ = self.msg_sender.send(RecordParam {
73 send_time: Utc::now().timestamp_nanos_opt().unwrap(),
74 record_index: 0,
75 variant: Some(record_param::Variant::AppStop(AppStop {})),
76 });
77 }
78}
79
80impl Drop for TLGuard {
81 fn drop(&mut self) {
82 if !self.is_normal_drop {
83 let _ = self.msg_sender.send(RecordParam {
85 send_time: Utc::now().timestamp_nanos_opt().unwrap(),
86 record_index: 0,
87 variant: Some(record_param::Variant::AppStop(AppStop {})),
88 });
89 }
90 }
91}
92
93pub async fn default_connect(
94 endpoint: impl TryInto<Endpoint, Error: Into<StdError>>,
95) -> Result<Channel, Error> {
96 Endpoint::new(endpoint)?
97 .executor(NoSubscriberExecutor)
98 .buffer_size(1024 * 1024 * 8)
100 .keep_alive_while_idle(true)
101 .keep_alive_timeout(Duration::from_secs(120))
102 .connect()
103 .await
104}
105
106pub trait TonicTryConnect {
107 async fn try_connect(self) -> Result<Channel, Error>;
108}
109
110impl TonicTryConnect for String {
111 async fn try_connect(self) -> Result<Channel, Error> {
112 default_connect(self).await
113 }
114}
115impl TonicTryConnect for &'static str {
116 async fn try_connect(self) -> Result<Channel, Error> {
117 default_connect(self).await
118 }
119}
120impl TonicTryConnect for Bytes {
121 async fn try_connect(self) -> Result<Channel, Error> {
122 default_connect(self).await
123 }
124}
125impl TonicTryConnect for Uri {
126 async fn try_connect(self) -> Result<Channel, Error> {
127 let endpoint: Endpoint = self.into();
128 default_connect(endpoint).await
129 }
130}
131impl TonicTryConnect for Channel {
132 async fn try_connect(self) -> Result<Channel, Error> {
133 Ok(self)
134 }
135}
136impl<F, FO> TonicTryConnect for F
137where
138 FO: Future<Output = Result<Channel, Error>>,
139 F: FnOnce() -> FO,
140{
141 async fn try_connect(self) -> Result<Channel, Error> {
142 self().await
143 }
144}
145
146impl<T> TonicTryConnect for Vec<T>
147where
148 T: TryInto<Endpoint>,
149 T::Error: Into<StdError>,
150{
151 async fn try_connect(self) -> Result<Channel, Error> {
152 let mut prev_err = None;
153 for endpoint in self.into_iter() {
154 match default_connect(endpoint).await {
155 Ok(n) => return Ok(n),
156 Err(err) => {
157 prev_err = Some(err);
158 continue;
159 }
160 }
161 }
162 Err(prev_err.expect("no found invalid endpoint"))
163 }
164}
165
166pub trait AsyncWriteWithSeek: AsyncWrite + AsyncSeek {}
167
168impl<T> AsyncWriteWithSeek for T where T: AsyncWrite + AsyncSeek {}
169pub trait AsyncWriteWithReadAndSeek: AsyncWriteWithSeek + AsyncRead {}
170
171impl<T> AsyncWriteWithReadAndSeek for T where T: AsyncWriteWithSeek + AsyncRead {}
172
173#[derive(Default)]
174pub struct TLSetting {
175 #[cfg(feature = "reconnect_and_persistence")]
176 pub reconnect_and_persistence: Option<TLReconnectAndPersistenceSetting>,
177}
178
179pub trait TLSubscriberExt: Sized {
180 async fn with_tracing_lv<D>(
181 self,
182 dst: D,
183 app_info: TLAppInfo,
184 setting: TLSetting,
185 ) -> Result<
186 (
187 Layered<TLLayer<Box<dyn TracingLiveMsgSubscriber>>, Self>,
188 impl Future<Output = ()> + Send + 'static,
189 TLGuard,
190 ),
191 Status,
192 >
193 where
194 D: TonicTryConnect;
195
196 async fn tracing_lv_init<D, U, F: Future<Output = U> + 'static>(
197 self,
198 dst: D,
199 app_info: TLAppInfo,
200 setting: TLSetting,
201 f: impl FnOnce() -> F,
202 ) -> Result<U, Status>
203 where
204 D: TonicTryConnect,
205 Layered<TLLayer<Box<dyn TracingLiveMsgSubscriber>>, Self>: Into<Dispatch>,
206 {
207 use tracing_subscriber::util::SubscriberInitExt;
208 let (layered, future, mut _guard) = self.with_tracing_lv(dst, app_info, setting).await?;
209
210 let handle = tokio::spawn(future);
211 layered.init();
212 let r = f().await;
213 _guard.normal_stop();
214 drop(_guard);
215 handle.await.unwrap();
216 Ok(r)
217 }
218}
219
220#[derive(Debug, Serialize, Deserialize)]
221pub struct AppRunData {
222 pub app_run_id: Uuid,
223 pub start_pos: u64,
224 pub end_pos: u64,
225 pub last_record_index: u64,
226}
227
228#[allow(refining_impl_trait)]
229impl<T> TLSubscriberExt for T
230where
231 T: SubscriberExt + for<'a> LookupSpan<'a>,
232{
233 async fn with_tracing_lv<D>(
234 self,
235 dst: D,
236 app_info: TLAppInfo,
237 _setting: TLSetting,
238 ) -> Result<
239 (
240 Layered<TLLayer<Box<dyn TracingLiveMsgSubscriber>>, Self>,
241 impl Future<Output = ()> + Send + 'static,
242 TLGuard,
243 ),
244 Status,
245 >
246 where
247 D: TonicTryConnect,
248 {
249 let run_id = Uuid::new_v4();
250
251 let channel = dst
252 .try_connect()
253 .await
254 .map_err(|err| Status::from_error(Box::new(err)))?;
255
256 let mut client = TracingServiceClient::new(NoSubscriberService(channel))
257 .send_compressed(CompressionEncoding::Zstd)
258 .max_decoding_message_size(usize::MAX)
259 .accept_compressed(CompressionEncoding::Zstd);
260 let (msg_sender, msg_receiver) = flume::unbounded();
261
262 let _app_start = {
263 let instant = Instant::now();
264 let _ = client.ping(PingParam {}).await?;
265 let rtt = instant.elapsed();
266 let app_start = app_info.into_app_start(run_id, rtt);
267 msg_sender
268 .send(RecordParam {
269 send_time: app_start.record_time.clone(),
270 record_index: 0,
271 variant: Some(record_param::Variant::AppStart(app_start.clone())),
272 })
273 .unwrap();
274 app_start
275 };
276
277 #[cfg(feature = "reconnect_and_persistence")]
278 let (subscriber, future) = {
279 use futures_util::FutureExt;
280 match _setting.reconnect_and_persistence {
281 None => (
282 Box::new(MsgReceiverSubscriber::new(msg_sender.clone())) as _,
283 tracing_msg_subscriber(client, msg_receiver).left_future(),
284 ),
285 Some(_setting) => {
286 let (subscriber, future) = reconnect_and_persistence(
287 _setting,
288 msg_sender.clone(),
289 msg_receiver,
290 _app_start,
291 client,
292 )
293 .await;
294 (subscriber, future.right_future())
295 }
296 }
297 };
298
299 #[cfg(not(feature = "reconnect_and_persistence"))]
300 let (subscriber, future) = {
301 drop(_app_start);
302 (
303 Box::new(MsgReceiverSubscriber::new(msg_sender.clone())) as _,
304 crate::client::tracing_msg_subscriber(client, msg_receiver),
305 )
306 };
307
308 Ok((
309 self.with(TLLayer {
310 subscriber,
311 enable_enter: false,
312 record_index: 1.into(),
313 }),
314 future,
315 TLGuard {
316 msg_sender,
317 is_normal_drop: false,
318 },
319 ))
320 }
321}
322
323fn tracing_msg_subscriber(
324 client: TracingServiceClient<NoSubscriberService<Channel>>,
325 msg_receiver: Receiver<RecordParam>,
326) -> impl Future<Output = ()> + Sized + Send + 'static {
327 async move {
328 let app_run = |mut client: TracingServiceClient<NoSubscriberService<Channel>>| {
329 let msg_receiver = msg_receiver.clone();
330 async move {
331 let stream = futures_util::stream::unfold(
332 (msg_receiver, None, false),
333 move |(msg_receiver, mut app_stop, is_end)| async move {
334 if is_end {
335 return None;
336 }
337 let (mut param, app_stop, is_end) = if app_stop.is_some() {
338 yield_now().await;
339 let param = msg_receiver
340 .try_recv()
341 .ok()
342 .or_else(|| app_stop.take())
343 .unwrap();
344 let is_end = app_stop.is_none();
345 (param, app_stop, is_end)
346 } else {
347 let param = msg_receiver.recv_async().await.ok()?;
348 if matches!(
349 param.variant.as_ref().unwrap(),
350 record_param::Variant::AppStop(_)
351 ) {
352 let mut app_stop = Some(param);
353 yield_now().await;
354 tokio::time::sleep(Duration::from_secs(1)).await;
355 let param = msg_receiver
356 .try_recv()
357 .ok()
358 .or_else(|| app_stop.take())
359 .unwrap();
360 let is_end = app_stop.is_none();
361 (param, app_stop, is_end)
362 } else {
363 (param, None, false)
364 }
365 };
366 param.send_time = Utc::now().timestamp_nanos_opt().unwrap();
367 Some((param, (msg_receiver, app_stop, is_end)))
368 },
369 );
370 (client.app_run(stream).await, client)
371 }
372 };
373 let (r, _client) = app_run(client).await;
374 use futures_util::StreamExt;
375 match r {
376 Ok(mut response) => {
377 while let Some(item) = response.get_mut().next().await {
378 match item {
379 Ok(_) => {}
380 Err(err) => {
381 eprintln!("tracing live stream error. {err:}");
382 break;
383 }
384 }
385 }
386 }
387 Err(err) => {
388 eprintln!("tracing live connect error. {err:}")
389 }
390 }
391 }
392}