use crate::prelude::*;
use beet_core::prelude::*;
use serde::Deserialize;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::time::SystemTime;
use uuid::Uuid;
#[derive(Component)]
pub struct TableStore<T: TableRow> {
name: String,
provider: Box<dyn TableProvider<T>>,
}
impl<T: TableRow> Clone for TableStore<T> {
fn clone(&self) -> Self {
Self {
name: self.name.clone(),
provider: self.provider.box_clone_table(),
}
}
}
impl<T: TableRow> TableStore<T> {
pub fn new(
provider: impl TableProvider<T>,
name: impl Into<String>,
) -> Self {
Self {
name: name.into(),
provider: Box::new(provider),
}
}
pub fn name(&self) -> &str { &self.name }
pub async fn bucket_create(&self) -> Result {
BucketProvider::bucket_create(self.provider.as_ref(), &self.name).await
}
pub async fn bucket_try_create(&self) -> Result {
BucketProvider::bucket_try_create(self.provider.as_ref(), &self.name)
.await
}
pub async fn bucket_exists(&self) -> Result<bool> {
BucketProvider::bucket_exists(self.provider.as_ref(), &self.name).await
}
pub async fn bucket_remove(&self) -> Result {
BucketProvider::bucket_remove(self.provider.as_ref(), &self.name).await
}
pub async fn push(&self, body: T) -> Result {
self.provider.insert_row(&self.name, body).await
}
pub async fn try_push(&self, body: T) -> Result {
let id = body.id();
if self.exists(body.id()).await? {
bevybail!("Row already exists: {}", id)
} else {
self.push(body).await
}
}
pub async fn exists(&self, id: Uuid) -> Result<bool> {
let path = RoutePath::new(id.to_string());
BucketProvider::exists(self.provider.as_ref(), &self.name, &path).await
}
pub async fn list(&self) -> Result<Vec<RoutePath>> {
BucketProvider::list(self.provider.as_ref(), &self.name).await
}
pub async fn get(&self, id: Uuid) -> Result<T> {
self.provider.get_row(&self.name, id).await
}
pub async fn get_all(&self) -> Result<Vec<(RoutePath, T)>> {
self.list()
.await?
.into_iter()
.map(async |path| {
let id = path
.to_string()
.trim_start_matches('/')
.parse::<Uuid>()
.map_err(|e| {
bevyhow!("Invalid UUID in path {}: {}", path, e)
})?;
let data = self.get(id).await?;
Ok::<_, BevyError>((path, data))
})
.xmap(async_ext::try_join_all)
.await
}
pub async fn remove(&self, id: Uuid) -> Result {
let path = RoutePath::new(id.to_string());
BucketProvider::remove(self.provider.as_ref(), &self.name, &path).await
}
pub async fn public_url(&self, path: &RoutePath) -> Result<Option<String>> {
BucketProvider::public_url(self.provider.as_ref(), &self.name, path)
.await
}
pub async fn region(&self) -> Option<String> {
BucketProvider::region(self.provider.as_ref())
}
}
pub trait TableRow: TableContent {
fn id(&self) -> Uuid;
fn timestamp(&self) -> Duration {
let timestamp = self.id().get_timestamp().unwrap();
let (secs, nanos) = timestamp.to_unix();
Duration::new(secs, nanos)
}
}
pub trait TableContent:
'static + Send + Sync + Clone + Serialize + DeserializeOwned
{
}
impl<T> TableContent for T where
T: 'static + Send + Sync + Clone + Serialize + DeserializeOwned
{
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TableItem<T> {
pub id: Uuid,
pub created: SystemTime,
pub data: T,
}
impl<T> TableItem<T> {
pub fn new(data: T) -> Self {
Self {
id: Uuid::now_v7(),
created: SystemTime::now(),
data,
}
}
}
impl<T: TableContent> TableRow for TableItem<T> {
fn id(&self) -> Uuid { self.id }
}
pub trait TableProvider<T: TableRow>:
BucketProvider + 'static + Send + Sync
{
fn box_clone_table(&self) -> Box<dyn TableProvider<T>>;
fn insert_row(
&self,
bucket_name: &str,
body: T,
) -> SendBoxedFuture<Result> {
let path = RoutePath::new(body.id().to_string());
match serde_json::to_vec(&body) {
Ok(vec) => {
BucketProvider::insert(self, bucket_name, &path, vec.into())
}
Err(e) => {
Box::pin(async move { bevybail!("Failed to serialize: {}", e) })
}
}
}
fn get_row(
&self,
bucket_name: &str,
id: Uuid,
) -> SendBoxedFuture<Result<T>> {
let path = RoutePath::new(id.to_string());
let fut = BucketProvider::get(self, bucket_name, &path);
Box::pin(async move {
let bytes = fut.await?;
match serde_json::from_slice(&bytes) {
Ok(val) => Ok(val),
Err(e) => bevybail!("Failed to deserialize: {}", e),
}
})
}
}
pub fn temp_table<T: TableRow>() -> TableStore<T> {
TableStore::new(InMemoryProvider::new(), "temp")
}
#[allow(unused_variables)]
pub async fn dynamo_fs_selector<T: TableRow>(
fs_path: &AbsPathBuf,
table_name: &str,
access: ServiceAccess,
) -> TableStore<T> {
match access {
ServiceAccess::Local => {
debug!("Table Selector - FS: {fs_path}");
TableStore::new(FsBucketProvider::new(fs_path.clone()), "")
}
#[cfg(not(all(feature = "aws", not(target_arch = "wasm32"))))]
ServiceAccess::Remote => {
debug!("Table Selector - FS (no aws feature): {fs_path}");
TableStore::new(FsBucketProvider::new(fs_path.clone()), "")
}
#[cfg(all(feature = "aws", not(target_arch = "wasm32")))]
ServiceAccess::Remote => {
debug!("Table Selector - Dynamo: {table_name}");
let provider = DynamoDbProvider::create().await;
TableStore::new(provider, table_name)
}
}
}
#[cfg(test)]
pub mod table_test {
use crate::prelude::*;
use serde::Deserialize;
use serde::Serialize;
use beet_core::prelude::*;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct MyObject {
some_key: String,
some_vec: Vec<MyObject>,
}
pub async fn run(provider: impl TableProvider<TableItem<MyObject>>) {
let table = TableStore::new(provider, "beet-test-table");
let body = TableItem::new(MyObject {
some_key: "some_value".into(),
some_vec: vec![MyObject {
some_key: "nested".into(),
some_vec: vec![],
}],
});
let id = body.id();
let path = RoutePath::new(id.to_string());
table.bucket_remove().await.ok();
table.bucket_exists().await.unwrap().xpect_false();
table.bucket_try_create().await.unwrap();
table.exists(id).await.unwrap().xpect_false();
table.remove(id).await.xpect_err();
table.push(body.clone()).await.unwrap();
table.bucket_exists().await.unwrap().xpect_true();
table.exists(id).await.unwrap().xpect_true();
table.list().await.unwrap().xpect_eq(vec![path.clone()]);
table.get(id).await.unwrap().xpect_eq(body.clone());
table.get(id).await.unwrap().xpect_eq(body);
table.remove(id).await.unwrap();
table.get(id).await.xpect_err();
table.bucket_remove().await.unwrap();
table.bucket_exists().await.unwrap().xpect_false();
}
}