1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
pub use anyhow::{Error, Result};
pub use flume::bounded as flume_bounded;
use futures_lite::future::Boxed;
pub use futures_lite::stream::{Stream, StreamExt};
use futures_util::SinkExt;
#[cfg(feature = "hyper")]
pub use quic_rpc::transport::hyper::{HyperConnection, HyperServerEndpoint};
#[cfg(feature = "quinn")]
pub use quic_rpc::transport::quinn::{QuinnConnection, QuinnServerEndpoint};
pub use quic_rpc::{
    client::{BoxStreamSync, UpdateSink},
    message::{
        BidiStreaming, BidiStreamingMsg, ClientStreaming, ClientStreamingMsg, Msg, RpcMsg,
        ServerStreaming, ServerStreamingMsg,
    },
    pattern::{
        bidi_streaming::{Error as BidiStreamingError, ItemError as BidiStreamingItemError},
        client_streaming::{Error as ClientStreamingError, ItemError as ClientStreamingItemError},
        rpc::Error as RpcError,
        server_streaming::{Error as ServerStreamingError, ItemError as ServerStreamingItemError},
    },
    server::RpcChannel,
    transport::{
        flume::{connection as flume_connection, FlumeConnection, FlumeServerEndpoint},
        ConnectionErrors,
    },
    RpcClient, RpcMessage, RpcServer, Service, ServiceConnection, ServiceEndpoint,
};
use std::{
    cell::RefCell,
    future::Future,
    pin::Pin,
    sync::{Arc, OnceLock},
};
use tokio::runtime::Builder;
pub use tokio::{pin, runtime::Runtime};
use tracing::{debug, error, warn};

/// 获取特定服务的处理程序
pub trait GetServiceHandler<T: Service> {
    fn get_handler(self: Arc<Self>) -> Arc<T>;
}

/// 服务的处理程序
pub trait ServiceHandler<T: Service> {
    /// 用于服务端处理请求和响应。
    ///
    /// # Arguments
    ///
    /// * `req`: 请求参数。
    /// * `chan`: 连接通道。
    /// * `rt`: 异步运行时。
    ///
    /// returns: impl Future<Output=Result<(), Error>>+Send+Sized 是否处理成功。
    ///
    /// # Examples
    ///
    /// ```
    /// None
    /// ```
    fn handle_rpc_request<S, E>(
        self: Arc<Self>,
        req: T::Req,
        chan: RpcChannel<S, E, T>,
        rt: &'static Runtime,
    ) -> impl Future<Output = Result<()>> + Send
    where
        E: ServiceEndpoint<S>,
        S: Service;
}

pub async fn run_server<E, S>(server: RpcServer<S, E>)
where
    S: Service + Default + ServiceHandler<S>,
    E: ServiceEndpoint<S>,
{
    let service = Arc::new(S::default());
    debug!("{:?}", service);
    static RT: OnceLock<Runtime> = OnceLock::new();
    let rt = RT.get_or_init(|| Builder::new_multi_thread().enable_all().build().unwrap());
    loop {
        let Ok(accepting) = server.accept().await else {
            continue;
        };
        match accepting.read_first().await {
            Err(err) => warn!(?err, "server accept failed"),
            Ok((req, chan)) => {
                let handler = service.clone();
                rt.spawn(async move {
                    if let Err(err) = S::handle_rpc_request(handler, req, chan, rt).await {
                        warn!(?err, "internal rpc error");
                    }
                });
            }
        }
    }
}

pub struct ClientStreamingResponse<T, S, C, R, I = S>(
    OnceLock<(RefCell<UpdateSink<S, C, T, I>>, Boxed<Result<R>>)>,
)
where
    S: Service,
    C: ServiceConnection<S>,
    I: Service,
    T: Into<I::Req>;

impl<T, S, C, R, I> ClientStreamingResponse<T, S, C, R, I>
where
    S: Service,
    I: Service,
    C: ServiceConnection<S>,
    T: Into<I::Req>,
{
    pub fn new(
        sink: UpdateSink<S, C, T, I>,
        result: impl Future<Output = Result<R>> + Send + 'static,
    ) -> Self {
        let lock = OnceLock::new();
        let _ = lock.set((RefCell::new(sink), Box::pin(result) as _));
        Self(lock)
    }

    pub async fn put(&mut self, item: T) -> &mut Self {
        let Some((sink, _)) = self.0.get() else {
            return self;
        };
        if let Err(e) = sink.borrow_mut().send(item).await {
            error!("Send data error. ({})", e);
        }
        self
    }

    pub async fn result(&mut self) -> Result<R> {
        let Some((sink, fut)) = self.0.take() else {
            panic!("Only once call.");
        };
        sink.borrow_mut().close().await.unwrap();
        drop(sink.into_inner());
        fut.await
    }
}

pub struct ServerStreamingResponse<R>(Pin<Box<dyn Stream<Item = Result<R, Error>>>>);

impl<R> ServerStreamingResponse<R> {
    pub fn new(stream: impl Stream<Item = Result<R, Error>> + 'static) -> Self {
        Self(Box::pin(stream) as _)
    }

    pub async fn next(&mut self) -> Option<Result<R>> {
        self.0.next().await
    }
}