tracing_lv/
client.rs

1#[cfg(feature = "reconnect_and_persistence")]
2use crate::reconnect_and_persistence::{
3    reconnect_and_persistence, TLReconnectAndPersistenceSetting,
4};
5use bytes::Bytes;
6use chrono::Utc;
7use flume::Receiver;
8use hyper::Uri;
9use serde::{Deserialize, Serialize};
10use std::future::Future;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite};
14use tokio::task::yield_now;
15use tokio::time::Instant;
16use tonic::codec::CompressionEncoding;
17use tonic::codegen::{Service, StdError};
18use tonic::transport::{Channel, Endpoint, Error};
19use tonic::Status;
20use tracing::instrument::{WithDispatch, WithSubscriber};
21use tracing::subscriber::NoSubscriber;
22use tracing_core::Dispatch;
23use tracing_lv_core::proto::tracing_service_client::TracingServiceClient;
24use tracing_lv_core::proto::{record_param, AppStop, PingParam, RecordParam};
25use tracing_lv_core::{MsgReceiverSubscriber, TLAppInfoExt};
26use tracing_lv_core::{TLAppInfo, TLLayer, TracingLiveMsgSubscriber};
27use tracing_subscriber::layer::{Layered, SubscriberExt};
28use tracing_subscriber::registry::LookupSpan;
29use uuid::Uuid;
30
31pub struct NoSubscriberService<T>(T);
32pub struct NoSubscriberExecutor;
33
34impl<F> hyper::rt::Executor<F> for NoSubscriberExecutor
35where
36    F: Future + Send + 'static,
37    F::Output: Send + 'static,
38{
39    fn execute(&self, fut: F) {
40        tokio::spawn(fut.with_subscriber(NoSubscriber::new()));
41    }
42}
43
44impl<T, P> Service<P> for NoSubscriberService<T>
45where
46    T: Service<P>,
47{
48    type Response = T::Response;
49    type Error = T::Error;
50    type Future = WithDispatch<T::Future>;
51
52    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
53        let _guard = tracing::subscriber::set_default(NoSubscriber::new());
54        self.0.poll_ready(cx)
55    }
56
57    fn call(&mut self, request: P) -> Self::Future {
58        let _guard = tracing::subscriber::set_default(NoSubscriber::new());
59        self.0.call(request).with_subscriber(NoSubscriber::new())
60    }
61}
62
63pub struct TLGuard {
64    msg_sender: flume::Sender<RecordParam>,
65    is_normal_drop: bool,
66}
67
68impl TLGuard {
69    pub fn normal_stop(&mut self) {
70        self.is_normal_drop = true;
71
72        let _ = self.msg_sender.send(RecordParam {
73            send_time: Utc::now().timestamp_nanos_opt().unwrap(),
74            record_index: 0,
75            variant: Some(record_param::Variant::AppStop(AppStop {})),
76        });
77    }
78}
79
80impl Drop for TLGuard {
81    fn drop(&mut self) {
82        if !self.is_normal_drop {
83            // TODO:
84            let _ = self.msg_sender.send(RecordParam {
85                send_time: Utc::now().timestamp_nanos_opt().unwrap(),
86                record_index: 0,
87                variant: Some(record_param::Variant::AppStop(AppStop {})),
88            });
89        }
90    }
91}
92
93pub async fn default_connect(
94    endpoint: impl TryInto<Endpoint, Error: Into<StdError>>,
95) -> Result<Channel, Error> {
96    Endpoint::new(endpoint)?
97        .executor(NoSubscriberExecutor)
98        // .tcp_nodelay(false)
99        .buffer_size(1024 * 1024 * 8)
100        .keep_alive_while_idle(true)
101        .keep_alive_timeout(Duration::from_secs(120))
102        .connect()
103        .await
104}
105
106pub trait TonicTryConnect {
107    async fn try_connect(self) -> Result<Channel, Error>;
108}
109
110impl TonicTryConnect for String {
111    async fn try_connect(self) -> Result<Channel, Error> {
112        default_connect(self).await
113    }
114}
115impl TonicTryConnect for &'static str {
116    async fn try_connect(self) -> Result<Channel, Error> {
117        default_connect(self).await
118    }
119}
120impl TonicTryConnect for Bytes {
121    async fn try_connect(self) -> Result<Channel, Error> {
122        default_connect(self).await
123    }
124}
125impl TonicTryConnect for Uri {
126    async fn try_connect(self) -> Result<Channel, Error> {
127        let endpoint: Endpoint = self.into();
128        default_connect(endpoint).await
129    }
130}
131impl TonicTryConnect for Channel {
132    async fn try_connect(self) -> Result<Channel, Error> {
133        Ok(self)
134    }
135}
136impl<F, FO> TonicTryConnect for F
137where
138    FO: Future<Output = Result<Channel, Error>>,
139    F: FnOnce() -> FO,
140{
141    async fn try_connect(self) -> Result<Channel, Error> {
142        self().await
143    }
144}
145
146impl<T> TonicTryConnect for Vec<T>
147where
148    T: TryInto<Endpoint>,
149    T::Error: Into<StdError>,
150{
151    async fn try_connect(self) -> Result<Channel, Error> {
152        let mut prev_err = None;
153        for endpoint in self.into_iter() {
154            match default_connect(endpoint).await {
155                Ok(n) => return Ok(n),
156                Err(err) => {
157                    prev_err = Some(err);
158                    continue;
159                }
160            }
161        }
162        Err(prev_err.expect("no found invalid endpoint"))
163    }
164}
165
166pub trait AsyncWriteWithSeek: AsyncWrite + AsyncSeek {}
167
168impl<T> AsyncWriteWithSeek for T where T: AsyncWrite + AsyncSeek {}
169pub trait AsyncWriteWithReadAndSeek: AsyncWriteWithSeek + AsyncRead {}
170
171impl<T> AsyncWriteWithReadAndSeek for T where T: AsyncWriteWithSeek + AsyncRead {}
172
173#[derive(Default)]
174pub struct TLSetting {
175    #[cfg(feature = "reconnect_and_persistence")]
176    pub reconnect_and_persistence: Option<TLReconnectAndPersistenceSetting>,
177}
178
179pub trait TLSubscriberExt: Sized {
180    async fn with_tracing_lv<D>(
181        self,
182        dst: D,
183        app_info: TLAppInfo,
184        setting: TLSetting,
185    ) -> Result<
186        (
187            Layered<TLLayer<Box<dyn TracingLiveMsgSubscriber>>, Self>,
188            impl Future<Output = ()> + Send + 'static,
189            TLGuard,
190        ),
191        Status,
192    >
193    where
194        D: TonicTryConnect;
195
196    async fn tracing_lv_init<D, U, F: Future<Output = U> + 'static>(
197        self,
198        dst: D,
199        app_info: TLAppInfo,
200        setting: TLSetting,
201        f: impl FnOnce() -> F,
202    ) -> Result<U, Status>
203    where
204        D: TonicTryConnect,
205        Layered<TLLayer<Box<dyn TracingLiveMsgSubscriber>>, Self>: Into<Dispatch>,
206    {
207        use tracing_subscriber::util::SubscriberInitExt;
208        let (layered, future, mut _guard) = self.with_tracing_lv(dst, app_info, setting).await?;
209
210        let handle = tokio::spawn(future);
211        layered.init();
212        let r = f().await;
213        _guard.normal_stop();
214        drop(_guard);
215        handle.await.unwrap();
216        Ok(r)
217    }
218}
219
220#[derive(Debug, Serialize, Deserialize)]
221pub struct AppRunData {
222    pub app_run_id: Uuid,
223    pub start_pos: u64,
224    pub end_pos: u64,
225    pub last_record_index: u64,
226}
227
228#[allow(refining_impl_trait)]
229impl<T> TLSubscriberExt for T
230where
231    T: SubscriberExt + for<'a> LookupSpan<'a>,
232{
233    async fn with_tracing_lv<D>(
234        self,
235        dst: D,
236        app_info: TLAppInfo,
237        _setting: TLSetting,
238    ) -> Result<
239        (
240            Layered<TLLayer<Box<dyn TracingLiveMsgSubscriber>>, Self>,
241            impl Future<Output = ()> + Send + 'static,
242            TLGuard,
243        ),
244        Status,
245    >
246    where
247        D: TonicTryConnect,
248    {
249        let run_id = Uuid::new_v4();
250
251        let channel = dst
252            .try_connect()
253            .await
254            .map_err(|err| Status::from_error(Box::new(err)))?;
255
256        let mut client = TracingServiceClient::new(NoSubscriberService(channel))
257            .send_compressed(CompressionEncoding::Zstd)
258            .max_decoding_message_size(usize::MAX)
259            .accept_compressed(CompressionEncoding::Zstd);
260        let (msg_sender, msg_receiver) = flume::unbounded();
261
262        let _app_start = {
263            let instant = Instant::now();
264            let _ = client.ping(PingParam {}).await?;
265            let rtt = instant.elapsed();
266            let app_start = app_info.into_app_start(run_id, rtt);
267            msg_sender
268                .send(RecordParam {
269                    send_time: app_start.record_time.clone(),
270                    record_index: 0,
271                    variant: Some(record_param::Variant::AppStart(app_start.clone())),
272                })
273                .unwrap();
274            app_start
275        };
276
277        #[cfg(feature = "reconnect_and_persistence")]
278        let (subscriber, future) = {
279            use futures_util::FutureExt;
280            match _setting.reconnect_and_persistence {
281                None => (
282                    Box::new(MsgReceiverSubscriber::new(msg_sender.clone())) as _,
283                    tracing_msg_subscriber(client, msg_receiver).left_future(),
284                ),
285                Some(_setting) => {
286                    let (subscriber, future) = reconnect_and_persistence(
287                        _setting,
288                        msg_sender.clone(),
289                        msg_receiver,
290                        _app_start,
291                        client,
292                    )
293                    .await;
294                    (subscriber, future.right_future())
295                }
296            }
297        };
298
299        #[cfg(not(feature = "reconnect_and_persistence"))]
300        let (subscriber, future) = {
301            drop(_app_start);
302            (
303                Box::new(MsgReceiverSubscriber::new(msg_sender.clone())) as _,
304                crate::client::tracing_msg_subscriber(client, msg_receiver),
305            )
306        };
307
308        Ok((
309            self.with(TLLayer {
310                subscriber,
311                enable_enter: false,
312                record_index: 1.into(),
313            }),
314            future,
315            TLGuard {
316                msg_sender,
317                is_normal_drop: false,
318            },
319        ))
320    }
321}
322
323fn tracing_msg_subscriber(
324    client: TracingServiceClient<NoSubscriberService<Channel>>,
325    msg_receiver: Receiver<RecordParam>,
326) -> impl Future<Output = ()> + Sized + Send + 'static {
327    async move {
328        let app_run = |mut client: TracingServiceClient<NoSubscriberService<Channel>>| {
329            let msg_receiver = msg_receiver.clone();
330            async move {
331                let stream = futures_util::stream::unfold(
332                    (msg_receiver, None, false),
333                    move |(msg_receiver, mut app_stop, is_end)| async move {
334                        if is_end {
335                            return None;
336                        }
337                        let (mut param, app_stop, is_end) = if app_stop.is_some() {
338                            yield_now().await;
339                            let param = msg_receiver
340                                .try_recv()
341                                .ok()
342                                .or_else(|| app_stop.take())
343                                .unwrap();
344                            let is_end = app_stop.is_none();
345                            (param, app_stop, is_end)
346                        } else {
347                            let param = msg_receiver.recv_async().await.ok()?;
348                            if matches!(
349                                param.variant.as_ref().unwrap(),
350                                record_param::Variant::AppStop(_)
351                            ) {
352                                let mut app_stop = Some(param);
353                                yield_now().await;
354                                tokio::time::sleep(Duration::from_secs(1)).await;
355                                let param = msg_receiver
356                                    .try_recv()
357                                    .ok()
358                                    .or_else(|| app_stop.take())
359                                    .unwrap();
360                                let is_end = app_stop.is_none();
361                                (param, app_stop, is_end)
362                            } else {
363                                (param, None, false)
364                            }
365                        };
366                        param.send_time = Utc::now().timestamp_nanos_opt().unwrap();
367                        Some((param, (msg_receiver, app_stop, is_end)))
368                    },
369                );
370                (client.app_run(stream).await, client)
371            }
372        };
373        let (r, _client) = app_run(client).await;
374        use futures_util::StreamExt;
375        match r {
376            Ok(mut response) => {
377                while let Some(item) = response.get_mut().next().await {
378                    match item {
379                        Ok(_) => {}
380                        Err(err) => {
381                            eprintln!("tracing live stream error. {err:}");
382                            break;
383                        }
384                    }
385                }
386            }
387            Err(err) => {
388                eprintln!("tracing live connect error. {err:}")
389            }
390        }
391    }
392}