atm0s_reverse_proxy_relayer/
agent.rs1use derive_more::derive::{Deref, Display, From};
2use protocol::proxy::AgentId;
3use tokio::{
4 io::{AsyncRead, AsyncWrite},
5 sync::{mpsc::Sender, oneshot},
6};
7
8pub mod quic;
9pub mod tcp;
10pub mod tls;
11
12#[derive(Debug, Hash, Display, PartialEq, Eq, From, Deref, Clone, Copy)]
13pub struct AgentSessionId(u64);
14
15impl AgentSessionId {
16 pub fn rand() -> Self {
17 Self(rand::random())
18 }
19}
20
21enum AgentSessionControl<S> {
22 CreateStream(oneshot::Sender<anyhow::Result<S>>),
23}
24
25#[derive(Debug)]
26pub struct AgentSession<S> {
27 agent_id: AgentId,
28 session_id: AgentSessionId,
29 domain: String,
30 control_tx: Sender<AgentSessionControl<S>>,
31}
32
33impl<S> AgentSession<S> {
34 fn new(agent_id: AgentId, session_id: AgentSessionId, domain: String, control_tx: Sender<AgentSessionControl<S>>) -> Self {
35 Self {
36 agent_id,
37 session_id,
38 domain,
39 control_tx,
40 }
41 }
42
43 pub fn agent_id(&self) -> AgentId {
44 self.agent_id
45 }
46}
47
48impl<S> Clone for AgentSession<S> {
49 fn clone(&self) -> Self {
50 Self {
51 agent_id: self.agent_id,
52 session_id: self.session_id,
53 domain: self.domain.clone(),
54 control_tx: self.control_tx.clone(),
55 }
56 }
57}
58
59pub enum AgentListenerEvent<C, S> {
60 Connected(AgentId, AgentSession<S>),
61 IncomingStream(AgentId, C, S),
62 Disconnected(AgentId, AgentSessionId),
63}
64
65pub trait AgentListener<C, S: AsyncRead + AsyncWrite> {
66 async fn recv(&mut self) -> anyhow::Result<AgentListenerEvent<C, S>>;
67 async fn shutdown(&mut self);
68}
69
70impl<S: AsyncRead + AsyncWrite + Send + Sync + 'static> AgentSession<S> {
71 pub fn session_id(&self) -> AgentSessionId {
72 self.session_id
73 }
74
75 pub fn domain(&self) -> &str {
76 &self.domain
77 }
78
79 pub async fn create_stream(&self) -> anyhow::Result<S> {
80 let (tx, rx) = oneshot::channel();
81 self.control_tx.send(AgentSessionControl::CreateStream(tx)).await?;
82 rx.await?
83 }
84}