use error::ClientError;
use parking_lot::RwLock;
use serde::Deserialize;
use serde_json::Map;
use std::{collections::HashMap, sync::Arc, time::Duration};
use timada::{pikav_client::PikavClient, PublishRequest, Struct, SubscribeReply, UnsubscribeReply};
use tokio::time::{interval_at, sleep, Instant};
use tonic::transport::Channel;
use tracing::error;
use url::Url;
pub use timada::{value::Kind, Event, ListValue, SubscribeRequest, UnsubscribeRequest, Value};
pub use tonic::Status;
mod error;
pub mod timada {
tonic::include_proto!("timada");
}
impl From<Value> for serde_json::Value {
fn from(value: Value) -> Self {
match value.kind {
Some(kind) => match kind {
Kind::DoubleValue(value) => serde_json::value::Number::from_f64(value)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
Kind::FloatValue(value) => serde_json::value::Number::from_f64(value.into())
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
Kind::Int32Value(value) => {
serde_json::Value::Number(serde_json::value::Number::from(value))
}
Kind::Int64Value(value) => {
serde_json::Value::Number(serde_json::value::Number::from(value))
}
Kind::Uint32Value(value) => {
serde_json::Value::Number(serde_json::value::Number::from(value))
}
Kind::Uint64Value(value) => {
serde_json::Value::Number(serde_json::value::Number::from(value))
}
Kind::Sint32Value(value) => {
serde_json::Value::Number(serde_json::value::Number::from(value))
}
Kind::Sint64Value(value) => {
serde_json::Value::Number(serde_json::value::Number::from(value))
}
Kind::Fixed32Value(value) => {
serde_json::Value::Number(serde_json::value::Number::from(value))
}
Kind::Fixed64Value(value) => {
serde_json::Value::Number(serde_json::value::Number::from(value))
}
Kind::Sfixed32Value(value) => {
serde_json::Value::Number(serde_json::value::Number::from(value))
}
Kind::Sfixed64Value(value) => {
serde_json::Value::Number(serde_json::value::Number::from(value))
}
Kind::BoolValue(value) => serde_json::Value::Bool(value),
Kind::StringValue(value) => serde_json::Value::String(value),
Kind::ListValue(value) => {
serde_json::Value::Array(value.values.into_iter().map(|v| v.into()).collect())
}
Kind::StructValue(value) => {
let mut fields = Map::new();
for (key, value) in &value.fields {
fields.insert(key.to_owned(), value.clone().into());
}
serde_json::Value::Object(fields)
}
},
None => serde_json::Value::Null,
}
}
}
impl From<serde_json::Value> for Value {
fn from(value: serde_json::Value) -> Self {
match value {
serde_json::Value::Null => Value { kind: None },
serde_json::Value::Bool(v) => Value {
kind: Some(Kind::BoolValue(v)),
},
serde_json::Value::Number(v) => {
if let Some(v) = v.as_f64() {
return Value {
kind: Some(Kind::DoubleValue(v)),
};
}
if let Some(v) = v.as_i64() {
return Value {
kind: Some(Kind::Int64Value(v)),
};
}
if let Some(v) = v.as_u64() {
return Value {
kind: Some(Kind::Uint64Value(v)),
};
}
Value { kind: None }
}
serde_json::Value::String(v) => Value {
kind: Some(Kind::StringValue(v)),
},
serde_json::Value::Array(values) => Value {
kind: Some(Kind::ListValue(ListValue {
values: values.into_iter().map(|v| v.into()).collect(),
})),
},
serde_json::Value::Object(v) => {
let mut fields = HashMap::new();
for (key, value) in &v {
fields.insert(key.to_owned(), value.clone().into());
}
Value {
kind: Some(Kind::StructValue(Struct { fields })),
}
}
}
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct ClientOptions<N: Into<String>> {
pub url: String,
pub namespace: N,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ClientInstanceOptions {
pub url: String,
pub namespace: Option<String>,
}
#[derive(Clone)]
pub struct Client {
channel: Channel,
queue: Arc<RwLock<Vec<Event>>>,
namespace: Option<String>,
pub same_region: bool,
}
impl Client {
pub fn from_vec<T: Into<String>>(values: Vec<T>) -> Result<Vec<Self>, Vec<ClientError>> {
let mut clients = Vec::new();
let mut errors = Vec::new();
for value in values {
match Self::new_instance(ClientInstanceOptions {
url: value.into(),
namespace: None,
}) {
Ok(client) => clients.push(client),
Err(e) => errors.push(e),
}
}
if !errors.is_empty() {
return Err(errors);
}
Ok(clients)
}
pub fn new<N: Into<String>>(options: ClientOptions<N>) -> Result<Self, ClientError> {
Self::new_instance(ClientInstanceOptions {
url: options.url,
namespace: Some(options.namespace.into()),
})
}
fn new_instance(options: ClientInstanceOptions) -> Result<Self, ClientError> {
let parsed_url =
Url::parse(options.url.as_str()).map_err(|e| ClientError::Unknown(e.to_string()))?;
let query: HashMap<_, _> = parsed_url.query_pairs().into_owned().collect();
let channel = Channel::from_shared(options.url.to_owned())
.map_err(|e| ClientError::Unknown(e.to_string()))?
.connect_lazy();
let same_region = query
.get("same_region")
.map(|r| r == "true")
.unwrap_or(false);
let client = Self {
channel,
queue: Arc::new(RwLock::new(Vec::new())),
namespace: options.namespace,
same_region,
};
Self::spawn_queue(client.clone());
Ok(client)
}
fn spawn_queue(me: Self) {
tokio::spawn(async move {
let mut interval = interval_at(Instant::now(), Duration::from_millis(300));
loop {
interval.tick().await;
let events = {
let queue = me.queue.read();
if queue.len() == 0 {
continue;
}
let mut events = Vec::new();
for event in queue.iter().take(1000) {
let mut event = event.clone();
if let Some(namespace) = &me.namespace {
event.topic = format!("{}/{}", namespace, event.topic)
}
events.push(event.clone());
}
events
};
if events.is_empty() {
continue;
}
let event_size = events.len();
let mut client = PikavClient::new(me.channel.clone());
let request = tonic::Request::new(PublishRequest {
propagate: me.namespace.is_some(),
events,
});
if let Err(e) = client.publish(request).await {
error!("{e}");
sleep(Duration::from_secs(1)).await;
continue;
}
{
let mut queue = me.queue.write();
queue.drain(0..event_size);
}
}
});
}
pub fn publish(&self, events: Vec<Event>) {
let mut queue = self.queue.write();
queue.extend(events);
}
pub async fn subscribe(
&self,
message: SubscribeRequest,
) -> Result<tonic::Response<SubscribeReply>, Status> {
let mut client = PikavClient::new(self.channel.clone());
let request = tonic::Request::new(message);
client.subscribe(request).await
}
pub async fn unsubscribe(
&self,
message: UnsubscribeRequest,
) -> Result<tonic::Response<UnsubscribeReply>, Status> {
let mut client = PikavClient::new(self.channel.clone());
let request = tonic::Request::new(message);
client.unsubscribe(request).await
}
}