use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use chrono::{DateTime, Duration, TimeZone, Utc};
use futures::TryFutureExt;
use serde_derive::{Deserialize, Serialize};
use stdng::{logs::TraceFn, trace_fn};
use tokio_stream::StreamExt;
use tonic::transport::Channel;
use tonic::transport::Endpoint;
use tonic::Request;
use self::rpc::frontend_client::FrontendClient as FlameFrontendClient;
use self::rpc::{
ApplicationSpec, CloseSessionRequest, CreateSessionRequest, CreateTaskRequest, Environment,
GetApplicationRequest, GetSessionRequest, GetTaskRequest, ListApplicationRequest,
ListExecutorRequest, ListSessionRequest, ListTaskRequest, RegisterApplicationRequest,
SessionSpec, TaskSpec, UnregisterApplicationRequest, UpdateApplicationRequest,
WatchTaskRequest,
};
use crate::apis::flame as rpc;
use crate::apis::Shim;
use crate::apis::{
ApplicationID, ApplicationState, CommonData, ExecutorState, FlameError, SessionID,
SessionState, TaskID, TaskInput, TaskOutput, TaskState,
};
use crate::lock_ptr;
type FlameClient = FlameFrontendClient<Channel>;
pub async fn connect(addr: &str) -> Result<Connection, FlameError> {
let endpoint = Endpoint::from_shared(addr.to_string())
.map_err(|_| FlameError::InvalidConfig("invalid address".to_string()))?;
let channel = endpoint
.connect()
.await
.map_err(|_| FlameError::InvalidConfig("failed to connect".to_string()))?;
Ok(Connection { channel })
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Event {
pub code: i32,
pub message: Option<String>,
#[serde(with = "serde_utc")]
pub creation_time: DateTime<Utc>,
}
#[derive(Clone)]
pub struct Connection {
pub(crate) channel: Channel,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct SessionAttributes {
pub application: String,
pub slots: u32,
#[serde(with = "serde_message")]
pub common_data: Option<CommonData>,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct ApplicationSchema {
pub input: Option<String>,
pub output: Option<String>,
pub common_data: Option<String>,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct ApplicationAttributes {
pub shim: Shim,
pub image: Option<String>,
pub description: Option<String>,
pub labels: Vec<String>,
pub command: Option<String>,
pub arguments: Vec<String>,
pub environments: HashMap<String, String>,
pub working_directory: Option<String>,
pub max_instances: Option<u32>,
#[serde(with = "serde_duration")]
pub delay_release: Option<Duration>,
pub schema: Option<ApplicationSchema>,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct Application {
pub name: ApplicationID,
pub attributes: ApplicationAttributes,
pub state: ApplicationState,
#[serde(with = "serde_utc")]
pub creation_time: DateTime<Utc>,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct Executor {
pub id: String,
pub state: ExecutorState,
pub session_id: Option<String>,
pub slots: u32,
pub node: String,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct Session {
#[serde(skip)]
pub(crate) client: Option<FlameClient>,
pub id: SessionID,
pub slots: u32,
pub application: String,
#[serde(with = "serde_utc")]
pub creation_time: DateTime<Utc>,
pub state: SessionState,
pub pending: i32,
pub running: i32,
pub succeed: i32,
pub failed: i32,
pub events: Vec<Event>,
pub tasks: Option<Vec<Task>>,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct Task {
pub id: TaskID,
pub ssn_id: SessionID,
pub state: TaskState,
#[serde(with = "serde_message")]
pub input: Option<TaskInput>,
#[serde(with = "serde_message")]
pub output: Option<TaskOutput>,
pub events: Vec<Event>,
}
pub type TaskInformerPtr = Arc<Mutex<dyn TaskInformer>>;
pub trait TaskInformer: Send + Sync + 'static {
fn on_update(&mut self, task: Task);
fn on_error(&mut self, e: FlameError);
}
impl Task {
pub fn is_completed(&self) -> bool {
self.state == TaskState::Succeed || self.state == TaskState::Failed
}
pub fn is_succeed(&self) -> bool {
self.state == TaskState::Succeed
}
pub fn is_failed(&self) -> bool {
self.state == TaskState::Failed
}
}
impl Connection {
pub async fn create_session(&self, attrs: &SessionAttributes) -> Result<Session, FlameError> {
trace_fn!("Connection::create_session");
let create_ssn_req = CreateSessionRequest {
session: Some(SessionSpec {
application: attrs.application.clone(),
slots: attrs.slots,
common_data: attrs.common_data.clone().map(CommonData::into),
}),
};
let mut client = FlameClient::new(self.channel.clone());
let ssn = client.create_session(create_ssn_req).await?;
let ssn = ssn.into_inner();
let mut ssn = Session::from(&ssn);
ssn.client = Some(client);
Ok(ssn)
}
pub async fn list_session(&self) -> Result<Vec<Session>, FlameError> {
let mut client = FlameClient::new(self.channel.clone());
let ssn_list = client.list_session(ListSessionRequest {}).await?;
Ok(ssn_list
.into_inner()
.sessions
.iter()
.map(Session::from)
.collect())
}
pub async fn get_session(&self, id: &SessionID) -> Result<Session, FlameError> {
let mut client = FlameClient::new(self.channel.clone());
let ssn = client
.get_session(GetSessionRequest {
session_id: id.to_string(),
})
.await?;
let ssn = ssn.into_inner();
let mut ssn = Session::from(&ssn);
ssn.client = Some(client);
Ok(ssn)
}
pub async fn register_application(
&self,
name: String,
app: ApplicationAttributes,
) -> Result<(), FlameError> {
let mut client = FlameClient::new(self.channel.clone());
let req = RegisterApplicationRequest {
name,
application: Some(ApplicationSpec::from(app)),
};
let res = client
.register_application(Request::new(req))
.await?
.into_inner();
if res.return_code < 0 {
Err(FlameError::Network(res.message.unwrap_or_default()))
} else {
Ok(())
}
}
pub async fn update_application(
&self,
name: String,
app: ApplicationAttributes,
) -> Result<(), FlameError> {
let mut client = FlameClient::new(self.channel.clone());
let req = UpdateApplicationRequest {
name,
application: Some(ApplicationSpec::from(app)),
};
let res = client
.update_application(Request::new(req))
.await?
.into_inner();
if res.return_code < 0 {
Err(FlameError::Network(res.message.unwrap_or_default()))
} else {
Ok(())
}
}
pub async fn unregister_application(&self, name: String) -> Result<(), FlameError> {
let mut client = FlameClient::new(self.channel.clone());
let req = UnregisterApplicationRequest { name };
let res = client
.unregister_application(Request::new(req))
.await?
.into_inner();
if res.return_code < 0 {
Err(FlameError::Network(res.message.unwrap_or_default()))
} else {
Ok(())
}
}
pub async fn list_application(&self) -> Result<Vec<Application>, FlameError> {
let mut client = FlameClient::new(self.channel.clone());
let app_list = client.list_application(ListApplicationRequest {}).await?;
Ok(app_list
.into_inner()
.applications
.iter()
.map(Application::from)
.collect())
}
pub async fn get_application(&self, name: &str) -> Result<Application, FlameError> {
let mut client = FlameClient::new(self.channel.clone());
let app = client
.get_application(GetApplicationRequest {
name: name.to_string(),
})
.await?;
Ok(Application::from(&app.into_inner()))
}
pub async fn list_executor(&self) -> Result<Vec<Executor>, FlameError> {
let mut client = FlameClient::new(self.channel.clone());
let executor_list = client.list_executor(ListExecutorRequest {}).await?;
Ok(executor_list
.into_inner()
.executors
.iter()
.map(Executor::from)
.collect())
}
}
impl Session {
pub async fn create_task(&self, input: Option<TaskInput>) -> Result<Task, FlameError> {
trace_fn!("Session::create_task");
let mut client = self
.client
.clone()
.ok_or(FlameError::Internal("no flame client".to_string()))?;
let create_task_req = CreateTaskRequest {
task: Some(TaskSpec {
session_id: self.id.clone(),
input: input.map(|input| input.to_vec()),
output: None,
}),
};
let task = client.create_task(create_task_req).await?;
let task = task.into_inner();
Ok(Task::from(&task))
}
pub async fn get_task(&self, id: &TaskID) -> Result<Task, FlameError> {
trace_fn!("Session::get_task");
let mut client = self
.client
.clone()
.ok_or(FlameError::Internal("no flame client".to_string()))?;
let get_task_req = GetTaskRequest {
session_id: self.id.clone(),
task_id: id.clone(),
};
let task = client.get_task(get_task_req).await?;
let task = task.into_inner();
Ok(Task::from(&task))
}
pub async fn list_tasks(&self) -> Result<Vec<Task>, FlameError> {
trace_fn!("Session::list_task");
let mut client = self
.client
.clone()
.ok_or(FlameError::Internal("no flame client".to_string()))?;
let task_stream = client
.list_task(Request::new(ListTaskRequest {
session_id: self.id.to_string(),
}))
.await?;
let mut task_list = vec![];
let mut task_stream = task_stream.into_inner();
while let Some(task) = task_stream.next().await {
if let Ok(t) = task {
task_list.push(Task::from(&t));
}
}
Ok(task_list)
}
pub async fn run_task(
&self,
input: Option<TaskInput>,
informer_ptr: TaskInformerPtr,
) -> Result<(), FlameError> {
trace_fn!("Session::run_task");
self.create_task(input)
.and_then(|task| self.watch_task(task.ssn_id.clone(), task.id, informer_ptr))
.await
}
pub async fn watch_task(
&self,
session_id: SessionID,
task_id: TaskID,
informer_ptr: TaskInformerPtr,
) -> Result<(), FlameError> {
trace_fn!("Session::watch_task");
let mut client = self
.client
.clone()
.ok_or(FlameError::Internal("no flame client".to_string()))?;
let watch_task_req = WatchTaskRequest {
session_id,
task_id,
};
let mut task_stream = client.watch_task(watch_task_req).await?.into_inner();
while let Some(task) = task_stream.next().await {
match task {
Ok(t) => {
let mut informer = lock_ptr!(informer_ptr)?;
informer.on_update(Task::from(&t));
}
Err(e) => {
let mut informer = lock_ptr!(informer_ptr)?;
informer.on_error(FlameError::from(e.clone()));
}
}
}
Ok(())
}
pub async fn close(&self) -> Result<(), FlameError> {
trace_fn!("Session::close");
let mut client = self
.client
.clone()
.ok_or(FlameError::Internal("no flame client".to_string()))?;
let close_ssn_req = CloseSessionRequest {
session_id: self.id.clone(),
};
client.close_session(close_ssn_req).await?;
Ok(())
}
}
impl From<&rpc::Task> for Task {
fn from(task: &rpc::Task) -> Self {
let metadata = task.metadata.clone().unwrap();
let spec = task.spec.clone().unwrap();
let status = task.status.clone().unwrap();
Task {
id: metadata.id,
ssn_id: spec.session_id.clone(),
input: spec.input.map(TaskInput::from),
output: spec.output.map(TaskOutput::from),
state: TaskState::try_from(status.state).unwrap_or(TaskState::default()),
events: status.events.clone().into_iter().map(Event::from).collect(),
}
}
}
impl From<&rpc::Session> for Session {
fn from(ssn: &rpc::Session) -> Self {
let metadata = ssn.metadata.clone().unwrap();
let status = ssn.status.clone().unwrap();
let spec = ssn.spec.clone().unwrap();
let naivedatetime_utc =
DateTime::from_timestamp_millis(status.creation_time * 1000).unwrap();
let creation_time = Utc.from_utc_datetime(&naivedatetime_utc.naive_utc());
Session {
client: None,
id: metadata.id,
slots: spec.slots,
application: spec.application,
creation_time,
state: SessionState::try_from(status.state).unwrap_or(SessionState::default()),
pending: status.pending,
running: status.running,
succeed: status.succeed,
failed: status.failed,
events: status.events.clone().into_iter().map(Event::from).collect(),
tasks: None,
}
}
}
impl From<&rpc::Event> for Event {
fn from(event: &rpc::Event) -> Self {
let second = event.creation_time / 1000;
let nanosecond = ((event.creation_time % 1000) * 1_000_000) as u32;
Self {
code: event.code,
message: event.message.clone(),
creation_time: DateTime::from_timestamp(second, nanosecond).unwrap(),
}
}
}
impl From<rpc::Event> for Event {
fn from(event: rpc::Event) -> Self {
Event::from(&event)
}
}
impl From<&rpc::Application> for Application {
fn from(app: &rpc::Application) -> Self {
let metadata = app.metadata.clone().unwrap();
let spec = app.spec.clone().unwrap();
let status = app.status.unwrap();
let naivedatetime_utc =
DateTime::from_timestamp_millis(status.creation_time * 1000).unwrap();
let creation_time = Utc.from_utc_datetime(&naivedatetime_utc.naive_utc());
Self {
name: metadata.name,
attributes: ApplicationAttributes::from(spec),
state: ApplicationState::from(status.state()),
creation_time,
}
}
}
impl From<ApplicationAttributes> for ApplicationSpec {
fn from(app: ApplicationAttributes) -> Self {
Self {
shim: app.shim.into(),
image: app.image.clone(),
description: app.description.clone(),
labels: app.labels.clone(),
command: app.command.clone(),
arguments: app.arguments.clone(),
environments: app
.environments
.clone()
.into_iter()
.map(|(key, value)| Environment { name: key, value })
.collect(),
working_directory: app.working_directory.clone(),
max_instances: app.max_instances,
delay_release: app.delay_release.map(|s| s.num_seconds()),
schema: app.schema.clone().map(rpc::ApplicationSchema::from),
}
}
}
impl From<ApplicationSpec> for ApplicationAttributes {
fn from(app: ApplicationSpec) -> Self {
Self {
shim: app.shim().into(),
image: app.image.clone(),
description: app.description.clone(),
labels: app.labels.clone(),
command: app.command.clone(),
arguments: app.arguments.clone(),
environments: app
.environments
.clone()
.into_iter()
.map(|env| (env.name, env.value))
.collect(),
working_directory: app.working_directory.clone(),
max_instances: app.max_instances,
delay_release: app.delay_release.map(Duration::seconds),
schema: app.schema.clone().map(ApplicationSchema::from),
}
}
}
impl From<ApplicationSchema> for rpc::ApplicationSchema {
fn from(schema: ApplicationSchema) -> Self {
Self {
input: schema.input,
output: schema.output,
common_data: schema.common_data,
}
}
}
impl From<rpc::ApplicationSchema> for ApplicationSchema {
fn from(schema: rpc::ApplicationSchema) -> Self {
Self {
input: schema.input,
output: schema.output,
common_data: schema.common_data,
}
}
}
impl From<&rpc::Executor> for Executor {
fn from(e: &rpc::Executor) -> Self {
let spec = e.spec.clone().unwrap();
let status = e.status.clone().unwrap();
let metadata = e.metadata.clone().unwrap();
let state = rpc::ExecutorState::try_from(status.state).unwrap().into();
Executor {
id: metadata.id,
session_id: status.session_id,
slots: spec.slots,
node: spec.node,
state,
}
}
}
impl From<rpc::Executor> for Executor {
fn from(e: rpc::Executor) -> Self {
Executor::from(&e)
}
}
mod serde_duration {
use chrono::Duration;
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(duration: &Option<Duration>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match duration {
Some(duration) => serializer.serialize_i64(duration.num_seconds()),
None => serializer.serialize_none(),
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
where
D: Deserializer<'de>,
{
let seconds = i64::deserialize(deserializer)?;
Ok(Some(Duration::seconds(seconds)))
}
}
mod serde_utc {
use chrono::{DateTime, Utc};
use serde::{self, Deserialize, Deserializer, Serializer};
pub fn serialize<S>(date: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_i64(date.timestamp())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
where
D: Deserializer<'de>,
{
let timestamp = i64::deserialize(deserializer)?;
DateTime::<Utc>::from_timestamp(timestamp, 0)
.ok_or(serde::de::Error::custom("invalid timestamp"))
}
}
mod serde_message {
use bytes::Bytes;
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(message: &Option<Bytes>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match message {
Some(message) => serializer.serialize_bytes(message),
None => serializer.serialize_none(),
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<Bytes>, D::Error>
where
D: Deserializer<'de>,
{
let bytes = Vec::<u8>::deserialize(deserializer)?;
Ok(Some(Bytes::from(bytes)))
}
}