wasmcloud_telnet/
lib.rs

1use crossbeam_channel::Sender;
2use log::{debug, info, warn};
3use std::{
4    collections::HashMap,
5    error::Error,
6    net::TcpListener,
7    sync::{Arc, RwLock},
8};
9use wasmcloud_actor_core::{CapabilityConfiguration, HealthCheckResponse};
10use wasmcloud_actor_telnet::{SendTextArgs, OP_SEND_TEXT};
11use wasmcloud_provider_core::{
12    capabilities::{CapabilityProvider, Dispatcher, NullDispatcher},
13    capability_provider,
14    core::{OP_BIND_ACTOR, OP_HEALTH_REQUEST, OP_REMOVE_ACTOR, SYSTEM_ACTOR},
15    deserialize, serialize,
16};
17mod server;
18mod session;
19
20#[allow(dead_code)]
21const CAPABILITY_ID: &str = "wasmcloud:telnet";
22
23type MessageHandlerResult = Result<Vec<u8>, Box<dyn Error + Send + Sync + 'static>>;
24
25#[cfg(not(feature = "static_plugin"))]
26capability_provider!(TelnetProvider, TelnetProvider::new);
27
28#[derive(Clone)]
29pub struct TelnetProvider {
30    dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
31    outbounds: Arc<RwLock<HashMap<String, Sender<String>>>>,
32    listeners: Arc<RwLock<HashMap<String, TcpListener>>>,
33}
34
35impl Default for TelnetProvider {
36    fn default() -> Self {
37        let _ = env_logger::try_init();
38
39        TelnetProvider {
40            dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
41            outbounds: Arc::new(RwLock::new(HashMap::new())),
42            listeners: Arc::new(RwLock::new(HashMap::new())),
43        }
44    }
45}
46
47impl TelnetProvider {
48    pub fn new() -> Self {
49        Self::default()
50    }
51
52    fn configure(&self, config: CapabilityConfiguration) -> MessageHandlerResult {
53        if let Some(listener) = self.listeners.read().unwrap().get(&config.module) {
54            debug!(
55                "Telnet session for actor {} already listening on {}",
56                listener.local_addr().unwrap(),
57                &config.module
58            );
59            return Ok(vec![]);
60        }
61
62        session::start_server(
63            config
64                .values
65                .get("MOTD")
66                .map_or_else(|| "".to_string(), |motd| motd.to_string()),
67            config
68                .values
69                .get("PORT")
70                .map_or_else(|| Ok(3000), |p| p.parse())?,
71            &config.module,
72            self.dispatcher.clone(),
73            self.outbounds.clone(),
74            self.listeners.clone(),
75        );
76        Ok(vec![])
77    }
78
79    fn deconfigure(&self, config: CapabilityConfiguration) -> MessageHandlerResult {
80        debug!("Shutting down telnet session for actor {}", &config.module);
81        // Remove actor session, shutdown TCP listener
82        if self
83            .listeners
84            .write()
85            .unwrap()
86            .remove(&config.module)
87            .is_none()
88        {
89            warn!(
90                "Attempted to deconfigure actor {}, but it was not configured",
91                &config.module
92            );
93        }
94        Ok(vec![])
95    }
96
97    /// Sends a text message to the appropriate socket
98    fn send_text(
99        &self,
100        _actor: &str,
101        msg: SendTextArgs,
102    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
103        match self.outbounds.read().unwrap().get(&msg.session).clone() {
104            Some(outbound) => serialize(outbound.send(msg.text).is_ok()),
105            None => Err(format!("Socket is not present for session {}", &msg.session).into()),
106        }
107    }
108
109    fn health(&self) -> MessageHandlerResult {
110        Ok(serialize(HealthCheckResponse {
111            healthy: true,
112            message: "".to_string(),
113        })?)
114    }
115}
116
117impl CapabilityProvider for TelnetProvider {
118    // Invoked by the runtime host to give this provider plugin the ability to communicate
119    // with actors
120    fn configure_dispatch(
121        &self,
122        dispatcher: Box<dyn Dispatcher>,
123    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
124        info!("Dispatcher received.");
125        let mut lock = self.dispatcher.write().unwrap();
126        *lock = dispatcher;
127
128        Ok(())
129    }
130
131    // Invoked by host runtime to allow an actor to make use of the capability
132    // All providers MUST handle the "configure" message, even if no work will be done
133    fn handle_call(
134        &self,
135        actor: &str,
136        op: &str,
137        msg: &[u8],
138    ) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
139        debug!("Received host call from {}, operation - {}", actor, op);
140
141        match op {
142            OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?),
143            OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => self.deconfigure(deserialize(msg)?),
144            OP_HEALTH_REQUEST if actor == SYSTEM_ACTOR => self.health(),
145            OP_SEND_TEXT => self.send_text(actor, deserialize(msg)?),
146            _ => Err("bad dispatch".into()),
147        }
148    }
149
150    fn stop(&self) {
151        /* nothing to do */
152    }
153}