#[macro_use]
extern crate wasmcloud_provider_core as codec;
use wasmcloud_actor_core as actor;
use wasmcloud_actor_eventstreams as eventstreams;
#[macro_use]
extern crate log;
use ::redis_streams::{
Client, Connection, ErrorKind, RedisError, RedisResult, StreamCommands, Value,
};
use actor::{CapabilityConfiguration, HealthCheckResponse};
use codec::core::{OP_BIND_ACTOR, OP_HEALTH_REQUEST, OP_REMOVE_ACTOR};
use codec::{
capabilities::{CapabilityProvider, Dispatcher, NullDispatcher},
core::SYSTEM_ACTOR,
};
use codec::{deserialize, serialize};
use eventstreams::*;
use std::error::Error;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
#[allow(unused)]
const CAPABILITY_ID: &str = "wasmcloud:eventstreams";
#[cfg(not(feature = "static_plugin"))]
capability_provider!(RedisStreamsProvider, RedisStreamsProvider::new);
#[derive(Clone)]
pub struct RedisStreamsProvider {
dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
clients: Arc<RwLock<HashMap<String, Client>>>,
}
impl Default for RedisStreamsProvider {
fn default() -> Self {
match env_logger::try_init() {
Ok(_) => {}
Err(_) => {}
};
RedisStreamsProvider {
dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
clients: Arc::new(RwLock::new(HashMap::new())),
}
}
}
impl RedisStreamsProvider {
pub fn new() -> Self {
Self::default()
}
fn configure(
&self,
config: CapabilityConfiguration,
) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
if self.clients.read().unwrap().contains_key(&config.module) {
return Ok(vec![]);
}
let c = initialize_client(config.clone())?;
self.clients.write().unwrap().insert(config.module, c);
Ok(vec![])
}
fn deconfigure(&self, actor: &str) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
self.clients.write().unwrap().remove(actor);
Ok(vec![])
}
fn actor_con(&self, actor: &str) -> RedisResult<Connection> {
let lock = self.clients.read().unwrap();
if let Some(client) = lock.get(actor) {
client.get_connection()
} else {
Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"No client for this actor. Did the host configure it?",
)))
}
}
fn write_event(
&self,
actor: &str,
args: WriteEventArgs,
) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
let data = map_to_tuples(args.values);
let ack = match self.actor_con(actor)?.xadd(args.stream_id, "*", &data) {
Ok(res) => EventAck {
error: None,
event_id: Some(res),
},
Err(e) => EventAck {
error: Some(format!("{}", e)),
event_id: None,
},
};
Ok(serialize(ack)?)
}
fn query_stream(
&self,
actor: &str,
query: StreamQuery,
) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
let sid = query.stream_id.to_string();
let items = if let Some(time_range) = query.range {
if query.count > 0 {
self.actor_con(actor)?.xrange_count(
query.stream_id,
time_range.min_time,
time_range.max_time,
query.count,
)?
} else {
self.actor_con(actor)?.xrange(
query.stream_id,
time_range.min_time,
time_range.max_time,
)?
}
} else {
if query.count > 0 {
self.actor_con(actor)?
.xrange_count(query.stream_id, "-", "+", query.count)?
} else {
self.actor_con(actor)?.xrange(query.stream_id, "-", "+")?
}
};
let mut events = Vec::new();
for stream_id in items.ids {
let newmap = stream_id
.map
.iter()
.map(|(k, v)| (k.to_string(), val_to_string(v)))
.collect::<HashMap<String, String>>();
events.push(Event {
event_id: stream_id.id,
stream_id: sid.to_string(),
values: newmap,
});
}
let list = EventList { events };
Ok(serialize(list)?)
}
}
impl CapabilityProvider for RedisStreamsProvider {
fn configure_dispatch(
&self,
dispatcher: Box<dyn Dispatcher>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
trace!("Dispatcher received.");
let mut lock = self.dispatcher.write().unwrap();
*lock = dispatcher;
Ok(())
}
fn handle_call(
&self,
actor: &str,
op: &str,
msg: &[u8],
) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
trace!("Received host call from {}, operation - {}", actor, op);
match op {
OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?),
OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => self.deconfigure(actor),
OP_HEALTH_REQUEST if actor == SYSTEM_ACTOR => Ok(serialize(HealthCheckResponse {
healthy: true,
message: "".to_string(),
})?),
OP_WRITE_EVENT => self.write_event(actor, deserialize(msg)?),
OP_QUERY_STREAM => self.query_stream(actor, deserialize(msg)?),
_ => Err("bad dispatch".into()),
}
}
fn stop(&self) {}
}
const ENV_REDIS_URL: &str = "URL";
fn initialize_client(
config: CapabilityConfiguration,
) -> Result<Client, Box<dyn Error + Send + Sync>> {
let redis_url = match config.values.get(ENV_REDIS_URL) {
Some(v) => v,
None => "redis://0.0.0.0:6379/",
}
.to_string();
info!(
"Attempting to connect {} to Redis at {}",
config.module, redis_url
);
match Client::open(redis_url.as_ref()) {
Ok(c) => Ok(c),
Err(e) => Err(format!("Failed to connect to redis: {}", e).into()),
}
}
fn map_to_tuples(map: HashMap<String, String>) -> Vec<(String, String)> {
map.into_iter().collect()
}
fn val_to_string(val: &Value) -> String {
if let Value::Data(vec) = val {
::std::str::from_utf8(&vec).unwrap().to_string()
} else {
"??".to_string()
}
}
#[cfg(test)]
mod test {
use super::*;
use redis_streams::Commands;
use std::collections::HashMap;
#[test]
fn round_trip() {
let prov = RedisStreamsProvider::new();
let config = CapabilityConfiguration {
module: "testing-actor".to_string(),
values: gen_config(),
};
let c = initialize_client(config.clone()).unwrap();
let _res: bool = c.get_connection().unwrap().del("my-stream").unwrap();
prov.configure(config).unwrap();
for _ in 0..6 {
let ev = WriteEventArgs {
stream_id: "my-stream".to_string(),
values: gen_values(),
};
let buf = serialize(&ev).unwrap();
let res = prov
.handle_call("testing-actor", OP_WRITE_EVENT, &buf)
.unwrap();
let evtack: EventAck = deserialize(&res).unwrap();
assert!(evtack.event_id.is_some());
}
let query = StreamQuery {
count: 0,
range: None,
stream_id: "my-stream".to_string(),
};
let buf = serialize(&query).unwrap();
let res = prov
.handle_call("testing-actor", OP_QUERY_STREAM, &buf)
.unwrap();
let query_res: EventList = deserialize(res.as_ref()).unwrap();
assert_eq!(6, query_res.events.len());
assert_eq!(query_res.events[0].values["scruffy-looking"], "nerf-herder");
let _res: bool = c.get_connection().unwrap().del("my-stream").unwrap();
}
fn gen_config() -> HashMap<String, String> {
let mut h = HashMap::new();
h.insert("URL".to_string(), "redis://0.0.0.0:6379/".to_string());
h
}
fn gen_values() -> HashMap<String, String> {
let mut h = HashMap::new();
h.insert("test".to_string(), "ok".to_string());
h.insert("scruffy-looking".to_string(), "nerf-herder".to_string());
h
}
}