webterm_agent/models/
runner.rs

1use crate::config::Config;
2use crate::messaging::process_r2a::process_r2a;
3use crate::models::activity_registry::ActivityRegistry;
4use crate::models::agent_error::AgentError;
5use crate::models::connection_manager::ConnectionManager;
6use crate::models::panic_error::PanicError;
7use crate::models::pty_activity_reader::PtyActivityReader;
8use crate::models::send_payload::SendPayload;
9use crate::models::socket_reader::SocketSubscriber;
10use crate::models::socket_writer::SocketPublisher;
11use std::sync::Arc;
12use tracing::{debug, error, info};
13use webterm_core::serialisers::talk_v1::a2f_builder::A2fBuilder;
14
15pub struct Runner {}
16
17impl Runner {
18    pub fn new() -> Self {
19        Self {}
20    }
21
22    pub async fn run(self, config: Arc<Config>) -> Result<(), PanicError> {
23        let cm = ConnectionManager::new(config.clone()).await;
24
25        loop {
26            if let Some((relay_pub, relay_sub)) = cm.pub_sub().await {
27                let r2a_task = tokio::spawn(Self::r2a_task(
28                    relay_sub,
29                    relay_pub.clone(),
30                    cm.clone(),
31                    config.clone(),
32                ));
33
34                let a2r_task = tokio::spawn(Self::a2r_task(relay_pub.clone()));
35
36                tokio::select! {
37                    result = r2a_task => {
38                        match result {
39                            Ok(Ok(())) => {
40                                info!("r2a_task exited with ok()");
41                            }
42                            Ok(Err(e)) => {
43                                error!("r2a_task error: {}", e);
44                            }
45                            Err(e) => {
46                                error!("r2a_task panic: {}", e);
47                            }
48                        }
49                    },
50                    result = a2r_task => {
51                        match result {
52                            Ok(Ok(())) => {
53                                info!("a2r_task exited with ok()");
54                            }
55                            Ok(Err(e)) => {
56                                error!("a2r_task error: {}", e);
57                            }
58                            Err(e) => {
59                                error!("a2r_task panic: {}", e);
60                            }
61                        }
62                    },
63                    _ = cm.wait_for_disconnect() => {
64                        info!("Received disconnect signal");
65                    }
66                }
67            } else {
68                cm.wait_for_connect().await;
69            }
70        }
71    }
72
73    async fn r2a_task(
74        mut relay_sub: SocketSubscriber,
75        relay_pub: SocketPublisher,
76        cm: Arc<ConnectionManager>,
77        config: Arc<Config>,
78    ) -> Result<(), AgentError> {
79        loop {
80            let data = relay_sub.recv().await;
81            match data {
82                Ok(Ok(Some(data))) => {
83                    let send = SendPayload::new();
84                    let send = process_r2a(&data, send, &config).await?;
85                    if send.is_relay_shutdown() {
86                        error!("Relay is shutting down");
87                        cm.disconnect().await;
88                        return Ok(());
89                    }
90                    send.dispatch(&relay_pub).await?
91                }
92                Err(e) => {
93                    cm.disconnect_with_error(e.into()).await;
94                    return Ok(());
95                }
96                _ => continue,
97            }
98        }
99    }
100
101    async fn a2r_task(relay_pub: SocketPublisher) -> Result<(), AgentError> {
102        let receiver = PtyActivityReader::receiver();
103
104        loop {
105            let output = receiver.lock().await.recv().await;
106            if let Some(output) = output {
107                let activity = ActivityRegistry::find(output.activity_id).await;
108                if let Ok(activity) = activity {
109                    let session = activity.parent_session().await;
110                    if let Ok(session) = session {
111                        let session = session.lock().await;
112                        let frontend = session.current_frontend();
113                        if let Ok(frontend) = frontend {
114                            let frontend = frontend.lock().await;
115                            let mut send = SendPayload::new();
116                            let a2f = A2fBuilder::new();
117                            let payload = a2f
118                                .build_activity_output(output.activity_id, &output.to_fb_output().0)
119                                .to_flatbuffers_encrypted(frontend.cryptographer()?)?;
120                            send.prepare_for_frontend(frontend.frontend_id(), payload);
121                            send.dispatch(&relay_pub).await?;
122                        } else {
123                            debug!(
124                                "frontend not found for session_id: {:?}",
125                                session.session_id()
126                            );
127                        }
128                    } else {
129                        debug!(
130                            "session not found for activity_id: {:?}",
131                            output.activity_id
132                        )
133                    }
134                } else {
135                    debug!(
136                        "activity not found for activity_id: {:?}",
137                        output.activity_id
138                    );
139                }
140            }
141        }
142    }
143}