webterm_agent/models/
runner.rs1use crate::config::Config;
2use crate::messaging::process_r2a::process_r2a;
3use crate::models::activity_registry::ActivityRegistry;
4use crate::models::agent_error::AgentError;
5use crate::models::connection_manager::ConnectionManager;
6use crate::models::panic_error::PanicError;
7use crate::models::pty_activity_reader::PtyActivityReader;
8use crate::models::send_payload::SendPayload;
9use crate::models::socket_reader::SocketSubscriber;
10use crate::models::socket_writer::SocketPublisher;
11use std::sync::Arc;
12use tracing::{debug, error, info};
13use webterm_core::serialisers::talk_v1::a2f_builder::A2fBuilder;
14
15pub struct Runner {}
16
17impl Runner {
18 pub fn new() -> Self {
19 Self {}
20 }
21
22 pub async fn run(self, config: Arc<Config>) -> Result<(), PanicError> {
23 let cm = ConnectionManager::new(config.clone()).await;
24
25 loop {
26 if let Some((relay_pub, relay_sub)) = cm.pub_sub().await {
27 let r2a_task = tokio::spawn(Self::r2a_task(
28 relay_sub,
29 relay_pub.clone(),
30 cm.clone(),
31 config.clone(),
32 ));
33
34 let a2r_task = tokio::spawn(Self::a2r_task(relay_pub.clone()));
35
36 tokio::select! {
37 result = r2a_task => {
38 match result {
39 Ok(Ok(())) => {
40 info!("r2a_task exited with ok()");
41 }
42 Ok(Err(e)) => {
43 error!("r2a_task error: {}", e);
44 }
45 Err(e) => {
46 error!("r2a_task panic: {}", e);
47 }
48 }
49 },
50 result = a2r_task => {
51 match result {
52 Ok(Ok(())) => {
53 info!("a2r_task exited with ok()");
54 }
55 Ok(Err(e)) => {
56 error!("a2r_task error: {}", e);
57 }
58 Err(e) => {
59 error!("a2r_task panic: {}", e);
60 }
61 }
62 },
63 _ = cm.wait_for_disconnect() => {
64 info!("Received disconnect signal");
65 }
66 }
67 } else {
68 cm.wait_for_connect().await;
69 }
70 }
71 }
72
73 async fn r2a_task(
74 mut relay_sub: SocketSubscriber,
75 relay_pub: SocketPublisher,
76 cm: Arc<ConnectionManager>,
77 config: Arc<Config>,
78 ) -> Result<(), AgentError> {
79 loop {
80 let data = relay_sub.recv().await;
81 match data {
82 Ok(Ok(Some(data))) => {
83 let send = SendPayload::new();
84 let send = process_r2a(&data, send, &config).await?;
85 if send.is_relay_shutdown() {
86 error!("Relay is shutting down");
87 cm.disconnect().await;
88 return Ok(());
89 }
90 send.dispatch(&relay_pub).await?
91 }
92 Err(e) => {
93 cm.disconnect_with_error(e.into()).await;
94 return Ok(());
95 }
96 _ => continue,
97 }
98 }
99 }
100
101 async fn a2r_task(relay_pub: SocketPublisher) -> Result<(), AgentError> {
102 let receiver = PtyActivityReader::receiver();
103
104 loop {
105 let output = receiver.lock().await.recv().await;
106 if let Some(output) = output {
107 let activity = ActivityRegistry::find(output.activity_id).await;
108 if let Ok(activity) = activity {
109 let session = activity.parent_session().await;
110 if let Ok(session) = session {
111 let session = session.lock().await;
112 let frontend = session.current_frontend();
113 if let Ok(frontend) = frontend {
114 let frontend = frontend.lock().await;
115 let mut send = SendPayload::new();
116 let a2f = A2fBuilder::new();
117 let payload = a2f
118 .build_activity_output(output.activity_id, &output.to_fb_output().0)
119 .to_flatbuffers_encrypted(frontend.cryptographer()?)?;
120 send.prepare_for_frontend(frontend.frontend_id(), payload);
121 send.dispatch(&relay_pub).await?;
122 } else {
123 debug!(
124 "frontend not found for session_id: {:?}",
125 session.session_id()
126 );
127 }
128 } else {
129 debug!(
130 "session not found for activity_id: {:?}",
131 output.activity_id
132 )
133 }
134 } else {
135 debug!(
136 "activity not found for activity_id: {:?}",
137 output.activity_id
138 );
139 }
140 }
141 }
142 }
143}