1use crossbeam_channel::Sender;
2use log::{debug, info, warn};
3use std::{
4 collections::HashMap,
5 error::Error,
6 net::TcpListener,
7 sync::{Arc, RwLock},
8};
9use wasmcloud_actor_core::{CapabilityConfiguration, HealthCheckResponse};
10use wasmcloud_actor_telnet::{SendTextArgs, OP_SEND_TEXT};
11use wasmcloud_provider_core::{
12 capabilities::{CapabilityProvider, Dispatcher, NullDispatcher},
13 capability_provider,
14 core::{OP_BIND_ACTOR, OP_HEALTH_REQUEST, OP_REMOVE_ACTOR, SYSTEM_ACTOR},
15 deserialize, serialize,
16};
17mod server;
18mod session;
19
20#[allow(dead_code)]
21const CAPABILITY_ID: &str = "wasmcloud:telnet";
22
23type MessageHandlerResult = Result<Vec<u8>, Box<dyn Error + Send + Sync + 'static>>;
24
25#[cfg(not(feature = "static_plugin"))]
26capability_provider!(TelnetProvider, TelnetProvider::new);
27
28#[derive(Clone)]
29pub struct TelnetProvider {
30 dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
31 outbounds: Arc<RwLock<HashMap<String, Sender<String>>>>,
32 listeners: Arc<RwLock<HashMap<String, TcpListener>>>,
33}
34
35impl Default for TelnetProvider {
36 fn default() -> Self {
37 let _ = env_logger::try_init();
38
39 TelnetProvider {
40 dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
41 outbounds: Arc::new(RwLock::new(HashMap::new())),
42 listeners: Arc::new(RwLock::new(HashMap::new())),
43 }
44 }
45}
46
47impl TelnetProvider {
48 pub fn new() -> Self {
49 Self::default()
50 }
51
52 fn configure(&self, config: CapabilityConfiguration) -> MessageHandlerResult {
53 if let Some(listener) = self.listeners.read().unwrap().get(&config.module) {
54 debug!(
55 "Telnet session for actor {} already listening on {}",
56 listener.local_addr().unwrap(),
57 &config.module
58 );
59 return Ok(vec![]);
60 }
61
62 session::start_server(
63 config
64 .values
65 .get("MOTD")
66 .map_or_else(|| "".to_string(), |motd| motd.to_string()),
67 config
68 .values
69 .get("PORT")
70 .map_or_else(|| Ok(3000), |p| p.parse())?,
71 &config.module,
72 self.dispatcher.clone(),
73 self.outbounds.clone(),
74 self.listeners.clone(),
75 );
76 Ok(vec![])
77 }
78
79 fn deconfigure(&self, config: CapabilityConfiguration) -> MessageHandlerResult {
80 debug!("Shutting down telnet session for actor {}", &config.module);
81 if self
83 .listeners
84 .write()
85 .unwrap()
86 .remove(&config.module)
87 .is_none()
88 {
89 warn!(
90 "Attempted to deconfigure actor {}, but it was not configured",
91 &config.module
92 );
93 }
94 Ok(vec![])
95 }
96
97 fn send_text(
99 &self,
100 _actor: &str,
101 msg: SendTextArgs,
102 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
103 match self.outbounds.read().unwrap().get(&msg.session).clone() {
104 Some(outbound) => serialize(outbound.send(msg.text).is_ok()),
105 None => Err(format!("Socket is not present for session {}", &msg.session).into()),
106 }
107 }
108
109 fn health(&self) -> MessageHandlerResult {
110 Ok(serialize(HealthCheckResponse {
111 healthy: true,
112 message: "".to_string(),
113 })?)
114 }
115}
116
117impl CapabilityProvider for TelnetProvider {
118 fn configure_dispatch(
121 &self,
122 dispatcher: Box<dyn Dispatcher>,
123 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
124 info!("Dispatcher received.");
125 let mut lock = self.dispatcher.write().unwrap();
126 *lock = dispatcher;
127
128 Ok(())
129 }
130
131 fn handle_call(
134 &self,
135 actor: &str,
136 op: &str,
137 msg: &[u8],
138 ) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
139 debug!("Received host call from {}, operation - {}", actor, op);
140
141 match op {
142 OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?),
143 OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => self.deconfigure(deserialize(msg)?),
144 OP_HEALTH_REQUEST if actor == SYSTEM_ACTOR => self.health(),
145 OP_SEND_TEXT => self.send_text(actor, deserialize(msg)?),
146 _ => Err("bad dispatch".into()),
147 }
148 }
149
150 fn stop(&self) {
151 }
153}