use std::path::{Path, PathBuf};
use crate::errors::OxenHttpError;
use crate::helpers::get_repo;
use crate::params::{DFOptsQuery, PageNumQuery, app_data, df_opts_query, path_param, query_param};
use actix_web::{HttpRequest, HttpResponse, web};
use liboxen::constants::{self, TABLE_NAME};
use liboxen::core::db::data_frames::df_db::with_df_db_manager;
use liboxen::core::db::data_frames::workspace_df_db::schema_without_oxen_cols;
use liboxen::error::OxenError;
use liboxen::model::{ParsedResource, Schema, Workspace};
use liboxen::opts::DFOpts;
use liboxen::repositories;
use liboxen::util::paginate;
use liboxen::view::data_frames::DataFramePayload;
use liboxen::view::entries::ResourceVersion;
use liboxen::view::entries::{PaginatedMetadataEntries, PaginatedMetadataEntriesResponse};
use liboxen::view::json_data_frame_view::WorkspaceJsonDataFrameViewResponse;
use liboxen::view::workspaces::RenameRequest;
use liboxen::view::{
JsonDataFrameViewResponse, JsonDataFrameViews, StatusMessage, StatusMessageDescription,
};
use actix_web::web::Bytes;
use futures_util::stream::Stream;
use std::io::{BufReader, Read};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
pub mod columns;
pub mod embeddings;
pub mod rows;
struct CleanupFileStream {
reader: BufReader<std::fs::File>,
temp_path: PathBuf,
buffer: [u8; 8192], tx: Option<mpsc::Sender<()>>,
}
impl CleanupFileStream {
fn new(path: PathBuf) -> std::io::Result<Self> {
let file = std::fs::File::open(&path)?;
let reader = BufReader::new(file);
let (tx, _) = mpsc::channel(1);
Ok(Self {
reader,
temp_path: path,
buffer: [0; 8192],
tx: Some(tx),
})
}
}
impl Stream for CleanupFileStream {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
match this.reader.read(&mut this.buffer) {
Ok(0) => {
if let Some(tx) = this.tx.take() {
let path = this.temp_path.clone();
tokio::spawn(async move {
log::debug!("removing temporary file {path:?}");
if let Err(e) = std::fs::remove_file(&path) {
log::error!("Failed to remove temporary file: {e:?}");
}
drop(tx); });
}
Poll::Ready(None)
}
Ok(n) => {
let bytes = Bytes::copy_from_slice(&this.buffer[..n]);
Poll::Ready(Some(Ok(bytes)))
}
Err(e) => Poll::Ready(Some(Err(e))),
}
}
}
pub async fn get(
req: HttpRequest,
query: web::Query<DFOptsQuery>,
) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let repo = get_repo(app_data, namespace, repo_name)?;
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
let file_path = PathBuf::from(path_param(&req, "path")?);
let mut opts = DFOpts::empty();
opts = df_opts_query::parse_opts(&query, &mut opts);
opts.path = Some(file_path.clone());
opts.page = Some(query.page.unwrap_or(constants::DEFAULT_PAGE_NUM).max(1));
opts.page_size = Some(
query
.page_size
.unwrap_or(constants::DEFAULT_PAGE_SIZE)
.max(1),
);
let is_indexed = repositories::workspaces::data_frames::is_indexed(&workspace, &file_path)?;
if !is_indexed {
let commit = workspace.commit.clone();
let resource: ParsedResource = ParsedResource {
path: file_path.clone(),
version: PathBuf::from(commit.id.to_string()),
resource: file_path.clone(),
workspace: None,
commit: Some(commit.clone()),
branch: None,
};
if opts.slice_indices().is_none() {
let page = opts.page.unwrap_or(constants::DEFAULT_PAGE_NUM);
let page_size = opts.page_size.unwrap_or(constants::DEFAULT_PAGE_SIZE);
let start = page_size
.saturating_mul(page - 1)
.min(usize::MAX - page_size);
let end = start + page_size;
opts.slice = Some(format!("{start}..{end}"));
}
let data_frame_slice =
repositories::data_frames::get_slice(&repo, &resource.clone(), &resource.path, &opts)
.await?;
let df = data_frame_slice.slice;
let count = if opts.has_filter_transform() {
data_frame_slice.total_entries
} else {
data_frame_slice.schemas.slice.size.height
};
let df_schema = if let Some(schema) =
repositories::data_frames::schemas::get_by_path(&repo, &commit, &file_path)?
{
schema
} else {
Schema::from_polars(df.schema())
};
let df_views =
JsonDataFrameViews::from_df_and_opts_unpaginated(df, df_schema, count, &opts).await;
let response = WorkspaceJsonDataFrameViewResponse {
status: StatusMessage::resource_found(),
data_frame: Some(df_views),
resource: None,
commit: None, derived_resource: None,
is_indexed,
};
return Ok(HttpResponse::Ok().json(response));
}
log::debug!("querying data frame {file_path:?}");
log::debug!("opts: {opts:?}");
let count = repositories::workspaces::data_frames::count(&workspace, &file_path)?;
let df = repositories::workspaces::data_frames::query(&workspace, &file_path, &opts)?;
let Some(mut df_schema) =
repositories::data_frames::schemas::get_by_path(&repo, &workspace.commit, &file_path)?
else {
log::error!("Failed to get schema for data frame {file_path:?}");
return Err(OxenHttpError::NotFound);
};
let resource = ResourceVersion {
path: file_path.to_string_lossy().to_string(),
version: workspace.commit.id.to_string(),
};
let og_schema = if let Some(schema) =
repositories::data_frames::schemas::get_by_path(&repo, &workspace.commit, &resource.path)?
{
schema
} else {
Schema::from_polars(df.schema())
};
df_schema.update_metadata_from_schema(&og_schema);
let mut df_views =
JsonDataFrameViews::from_df_and_opts_unpaginated(df, df_schema, count, &opts).await;
repositories::workspaces::data_frames::columns::decorate_fields_with_column_diffs(
&workspace,
&file_path,
&mut df_views,
)?;
let new_schema = repositories::data_frames::schemas::get_staged_schema_with_staged_db_manager(
&workspace.workspace_repo,
&file_path,
)?;
repositories::workspaces::data_frames::columns::update_column_schemas(
new_schema,
&mut df_views,
)?;
let response = WorkspaceJsonDataFrameViewResponse {
status: StatusMessage::resource_found(),
data_frame: Some(df_views),
resource: Some(resource),
commit: None, derived_resource: None,
is_indexed,
};
Ok(HttpResponse::Ok().json(response))
}
pub async fn get_schema(req: HttpRequest) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let repo = get_repo(app_data, namespace, repo_name)?;
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
let file_path = PathBuf::from(path_param(&req, "path")?);
let is_indexed = repositories::workspaces::data_frames::is_indexed(&workspace, &file_path)?;
if !is_indexed {
repositories::workspaces::data_frames::index(&repo, &workspace, &file_path).await?;
}
let db_path = repositories::workspaces::data_frames::duckdb_path(&workspace, &file_path);
let schema = with_df_db_manager(&db_path, |manager| {
manager.with_conn(|conn| schema_without_oxen_cols(conn, TABLE_NAME))
})?;
Ok(HttpResponse::Ok().json(schema))
}
fn determine_extension<'a>(opts: &'a DFOpts, file_path: &'a Path) -> &'a str {
match &opts.output {
Some(output) => output
.extension()
.unwrap_or_default()
.to_str()
.unwrap_or_default(),
None => file_path
.extension()
.unwrap_or_default()
.to_str()
.unwrap_or_default(),
}
}
pub async fn download(
req: HttpRequest,
query: web::Query<DFOptsQuery>,
) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let repo = get_repo(app_data, namespace, repo_name)?;
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
let file_path = PathBuf::from(path_param(&req, "path")?);
let Some(filename) = file_path.file_name().and_then(|n| n.to_str()) else {
log::error!(
"Unable to get filename from request param path: {}",
file_path.display()
);
return Err(OxenHttpError::BadRequest(
"Unable to parse filename from 'path' parameter".into(),
));
};
let opts = df_opts_from_query(&query, file_path.clone());
let is_indexed = repositories::workspaces::data_frames::is_indexed(&workspace, &file_path)?;
if !is_indexed {
let file_exists = file_exists_in_workspace_or_commit(&workspace, &file_path)?;
if !file_exists {
return Err(OxenHttpError::NotFound);
}
let response = df_not_indexed_response();
return Ok(HttpResponse::Ok().json(response));
}
log::debug!("exporting data frame {file_path:?}");
log::debug!("opts: {opts:?}");
let temp_dir = std::env::temp_dir();
let extension = determine_extension(&opts, &file_path);
let temp_file = temp_dir.join(format!("{}.{}", uuid::Uuid::new_v4(), extension));
match repositories::workspaces::data_frames::export(&workspace, &file_path, &opts, &temp_file) {
Ok(_) => (),
Err(e) => {
let error_str = format!("{e:?}");
log::error!("Error exporting data frame {file_path:?}: {error_str}");
let response = StatusMessageDescription::bad_request(error_str);
return Ok(HttpResponse::BadRequest().json(response));
}
};
let contents = {
let mut file = std::fs::File::open(&temp_file)?;
let mut contents = Vec::new();
file.read_to_end(&mut contents)?;
contents
};
if let Err(e) = std::fs::remove_file(&temp_file) {
log::error!("Failed to remove temporary file: {e:?}");
}
Ok(HttpResponse::Ok()
.append_header(("Content-Type", "text/csv"))
.append_header((
"Content-Disposition",
format!("attachment; filename=\"{filename}\""),
))
.body(contents))
}
fn df_opts_from_query(query: &web::Query<DFOptsQuery>, file_path: PathBuf) -> DFOpts {
let mut opts = DFOpts::empty();
opts = df_opts_query::parse_opts(query, &mut opts);
opts.path = Some(file_path);
opts.page = Some(query.page.unwrap_or(constants::DEFAULT_PAGE_NUM));
opts.page_size = Some(query.page_size.unwrap_or(constants::DEFAULT_PAGE_SIZE));
opts
}
fn file_exists_in_workspace_or_commit(
workspace: &Workspace,
file_path: impl AsRef<Path>,
) -> Result<bool, OxenHttpError> {
let file_exists = {
(
repositories::tree::get_file_by_path(
&workspace.base_repo,
&workspace.commit,
&file_path,
)?
.is_some()
) || (
repositories::workspaces::files::exists(workspace, &file_path)?
)
};
Ok(file_exists)
}
fn df_not_indexed_response() -> WorkspaceJsonDataFrameViewResponse {
WorkspaceJsonDataFrameViewResponse {
status: StatusMessage::resource_found(),
data_frame: None,
resource: None,
commit: None, derived_resource: None,
is_indexed: false,
}
}
pub async fn download_streaming(
req: HttpRequest,
query: web::Query<DFOptsQuery>,
) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let repo = get_repo(app_data, namespace, repo_name)?;
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
let file_path = PathBuf::from(path_param(&req, "path")?);
let Some(filename) = file_path.file_name().and_then(|n| n.to_str()) else {
log::error!(
"Unable to get filename from request param path: {}",
file_path.display()
);
return Err(OxenHttpError::BadRequest(
"Unable to parse filename from 'path' parameter".into(),
));
};
let opts = df_opts_from_query(&query, file_path.clone());
let is_indexed = repositories::workspaces::data_frames::is_indexed(&workspace, &file_path)?;
if !is_indexed {
let file_exists = file_exists_in_workspace_or_commit(&workspace, &file_path)?;
if !file_exists {
return Err(OxenHttpError::NotFound);
}
let response = df_not_indexed_response();
return Ok(HttpResponse::Ok().json(response));
}
log::debug!("exporting data frame {file_path:?}");
log::debug!("opts: {opts:?}");
let temp_dir = std::env::temp_dir();
let extension = determine_extension(&opts, &file_path);
let temp_file = temp_dir.join(format!("{}.{}", uuid::Uuid::new_v4(), extension));
match repositories::workspaces::data_frames::export(&workspace, &file_path, &opts, &temp_file) {
Ok(_) => (),
Err(e) => {
log::error!("Error exporting data frame {file_path:?}: {e:?}");
let error_str = format!("{e:?}");
let response = StatusMessageDescription::bad_request(error_str);
return Ok(HttpResponse::BadRequest().json(response));
}
};
let stream = CleanupFileStream::new(temp_file)?;
Ok(HttpResponse::Ok()
.append_header(("Content-Type", "text/csv"))
.append_header((
"Content-Disposition",
format!("attachment; filename=\"{filename}\""),
))
.streaming(stream))
}
pub async fn get_by_branch(
req: HttpRequest,
query: web::Query<PageNumQuery>,
) -> actix_web::Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req).unwrap();
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let repo = get_repo(app_data, namespace, repo_name)?;
let branch_name: &str = query_param(&req, "branch");
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
let page = query.page.unwrap_or(constants::DEFAULT_PAGE_NUM);
let page_size = query.page_size.unwrap_or(constants::DEFAULT_PAGE_SIZE);
let branch = repositories::branches::get_by_name(&repo, branch_name)?;
let commit = repositories::commits::get_by_id(&repo, &branch.commit_id)?
.ok_or_else(|| OxenError::resource_not_found(&branch.commit_id))?;
let entries = repositories::entries::list_tabular_files_in_repo(&repo, &commit)?;
log::debug!("got {} tabular entries", entries.len());
let mut editable_entries = vec![];
for entry in entries {
log::debug!("considering entry {entry:?}");
let path = PathBuf::from(&entry.filename);
if repositories::workspaces::data_frames::is_indexed(&workspace, &path)? {
editable_entries.push(entry);
} else {
log::debug!("not indexed {path:?}");
}
}
let (paginated_entries, pagination) = paginate(editable_entries, page, page_size);
Ok(HttpResponse::Ok().json(PaginatedMetadataEntriesResponse {
status: StatusMessage::resource_found(),
entries: PaginatedMetadataEntries {
entries: paginated_entries,
pagination,
},
}))
}
pub async fn diff(
req: HttpRequest,
query: web::Query<DFOptsQuery>,
) -> actix_web::Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(app_data, namespace, repo_name)?;
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let file_path = PathBuf::from(path_param(&req, "path")?);
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
let mut opts = DFOpts::empty();
opts = df_opts_query::parse_opts(&query, &mut opts);
opts.page = Some(query.page.unwrap_or(constants::DEFAULT_PAGE_NUM));
opts.page_size = Some(query.page_size.unwrap_or(constants::DEFAULT_PAGE_SIZE));
let df = repositories::workspaces::data_frames::query(&workspace, &file_path, &opts)?;
let diff_df = repositories::workspaces::data_frames::diff(&workspace, &file_path)?;
let mut df_schema =
repositories::workspaces::data_frames::schemas::get_by_path(&workspace, &file_path)?;
let resource = ResourceVersion {
path: file_path.to_string_lossy().to_string(),
version: workspace.commit.id.to_string(),
};
let og_schema = if let Some(schema) =
repositories::data_frames::schemas::get_by_path(&repo, &workspace.commit, resource.path)?
{
schema
} else {
Schema::from_polars(df.schema())
};
df_schema.update_metadata_from_schema(&og_schema);
let mut df_views = JsonDataFrameViews::from_df_and_opts(diff_df, df_schema, &opts).await;
repositories::workspaces::data_frames::columns::decorate_fields_with_column_diffs(
&workspace,
&file_path,
&mut df_views,
)?;
let resource = ResourceVersion {
path: file_path.to_string_lossy().to_string(),
version: workspace.commit.id.to_string(),
};
let resource = JsonDataFrameViewResponse {
data_frame: df_views,
status: StatusMessage::resource_found(),
resource: Some(resource),
commit: None,
derived_resource: None,
};
Ok(HttpResponse::Ok().json(resource))
}
pub async fn put(req: HttpRequest, body: String) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let repo = get_repo(app_data, namespace, repo_name)?;
let file_path = PathBuf::from(path_param(&req, "path")?);
log::debug!("workspace {workspace_id} data frame put {file_path:?}");
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
let data: DataFramePayload = serde_json::from_str(&body)?;
log::debug!("workspace {workspace_id} data frame put {data:?}");
let to_index = data.is_indexed;
let is_indexed = repositories::workspaces::data_frames::is_indexed(&workspace, &file_path)?;
if !is_indexed && to_index {
repositories::workspaces::data_frames::index(&repo, &workspace, &file_path).await?;
} else if is_indexed && !to_index {
repositories::workspaces::data_frames::unindex(&workspace, &file_path)?;
}
Ok(HttpResponse::Ok().json(StatusMessage::resource_updated()))
}
pub async fn delete(req: HttpRequest) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let repo = get_repo(app_data, namespace, repo_name)?;
let file_path = PathBuf::from(path_param(&req, "path")?);
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
repositories::workspaces::data_frames::restore(&repo, &workspace, file_path).await?;
Ok(HttpResponse::Ok().json(StatusMessage::resource_deleted()))
}
pub async fn rename(req: HttpRequest, body: String) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let repo = get_repo(app_data, namespace, repo_name)?;
let path = PathBuf::from(path_param(&req, "path")?);
let body: RenameRequest = serde_json::from_str(&body)?;
if body.new_path.is_empty() {
return Err(OxenHttpError::BadRequest("new_path cannot be empty".into()));
}
let new_path = PathBuf::from(body.new_path);
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
if repositories::entries::get_file(&repo, &workspace.commit, &new_path)?.is_some() {
return Err(OxenHttpError::BadRequest("new_path already exists".into()));
}
repositories::workspaces::data_frames::rename(&workspace, &path, &new_path).await?;
Ok(HttpResponse::Ok().json(StatusMessage::resource_updated()))
}
#[cfg(test)]
mod tests {
use crate::app_data::OxenAppData;
use crate::controllers;
use crate::test;
use actix_web::{App, web};
use liboxen::error::OxenError;
use liboxen::repositories;
use liboxen::util;
use liboxen::view::json_data_frame_view::WorkspaceJsonDataFrameViewResponse;
#[actix_web::test]
async fn test_download_indexed_data_frame_returns_csv_content() -> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Name";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let csv_dir = repo.path.join("data");
util::fs::create_dir_all(&csv_dir)?;
let csv_path = csv_dir.join("test.csv");
util::fs::write_to_path(&csv_path, "col_a,col_b\n1,2\n3,4\n")?;
repositories::add(&repo, &csv_path).await?;
let commit = repositories::commit(&repo, "Add CSV")?;
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace = repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let file_path = std::path::Path::new("data/test.csv");
repositories::workspaces::data_frames::index(&repo, &workspace, file_path).await?;
let uri = format!(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/data_frames/download/{}",
file_path.display()
);
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/data_frames/download/{path:.*}",
web::get().to(controllers::workspaces::data_frames::download),
),
)
.await;
let req = actix_web::test::TestRequest::get().uri(&uri).to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::OK);
let content_type = resp
.headers()
.get("Content-Type")
.unwrap()
.to_str()
.unwrap();
assert_eq!(content_type, "text/csv");
let bytes = actix_http::body::to_bytes(resp.into_body()).await.unwrap();
let body = String::from_utf8(bytes.to_vec()).unwrap();
assert!(body.contains("col_a"));
assert!(body.contains("col_b"));
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_download_unindexed_workspace_only_file_returns_200() -> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Name";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let readme = repo.path.join("README.md");
util::fs::write_to_path(&readme, "# Test")?;
repositories::add(&repo, &readme).await?;
let commit = repositories::commit(&repo, "Initial commit")?;
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace = repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let workspace_csv_dir = workspace.dir().join("data");
util::fs::create_dir_all(&workspace_csv_dir)?;
let workspace_csv_path = workspace_csv_dir.join("new.csv");
util::fs::write_to_path(&workspace_csv_path, "x,y\n10,20\n")?;
repositories::workspaces::files::add(&workspace, &workspace_csv_path).await?;
let file_path = "data/new.csv";
let uri = format!(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/data_frames/download/{file_path}"
);
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/data_frames/download/{path:.*}",
web::get().to(controllers::workspaces::data_frames::download),
),
)
.await;
let req = actix_web::test::TestRequest::get().uri(&uri).to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::OK);
let bytes = actix_http::body::to_bytes(resp.into_body()).await.unwrap();
let response: WorkspaceJsonDataFrameViewResponse = serde_json::from_slice(&bytes)?;
assert!(!response.is_indexed);
assert!(response.data_frame.is_none());
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_download_nonexistent_path_returns_404() -> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Name";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let csv_dir = repo.path.join("data");
util::fs::create_dir_all(&csv_dir)?;
let csv_path = csv_dir.join("test.csv");
util::fs::write_to_path(&csv_path, "col_a,col_b\n1,2\n3,4\n")?;
repositories::add(&repo, &csv_path).await?;
let commit = repositories::commit(&repo, "Add CSV")?;
let workspace_id = uuid::Uuid::new_v4().to_string();
repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let nonexistent_path = "data/this_does_not_exist.csv";
let uri = format!(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/data_frames/download/{nonexistent_path}"
);
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/data_frames/download/{path:.*}",
web::get().to(controllers::workspaces::data_frames::download),
),
)
.await;
let req = actix_web::test::TestRequest::get().uri(&uri).to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::NOT_FOUND);
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_download_existing_unindexed_returns_200_with_is_indexed_false()
-> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Name";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let csv_dir = repo.path.join("data");
util::fs::create_dir_all(&csv_dir)?;
let csv_path = csv_dir.join("test.csv");
util::fs::write_to_path(&csv_path, "col_a,col_b\n1,2\n3,4\n")?;
repositories::add(&repo, &csv_path).await?;
let commit = repositories::commit(&repo, "Add CSV")?;
let workspace_id = uuid::Uuid::new_v4().to_string();
repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let file_path = "data/test.csv";
let uri = format!(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/data_frames/download/{file_path}"
);
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/data_frames/download/{path:.*}",
web::get().to(controllers::workspaces::data_frames::download),
),
)
.await;
let req = actix_web::test::TestRequest::get().uri(&uri).to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::OK);
let bytes = actix_http::body::to_bytes(resp.into_body()).await.unwrap();
let response: WorkspaceJsonDataFrameViewResponse = serde_json::from_slice(&bytes)?;
assert!(!response.is_indexed);
assert!(response.data_frame.is_none());
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_download_streaming_unindexed_committed_file_returns_200() -> Result<(), OxenError>
{
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Name";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let csv_dir = repo.path.join("data");
util::fs::create_dir_all(&csv_dir)?;
let csv_path = csv_dir.join("test.csv");
util::fs::write_to_path(&csv_path, "col_a,col_b\n1,2\n3,4\n")?;
repositories::add(&repo, &csv_path).await?;
let commit = repositories::commit(&repo, "Add CSV")?;
let workspace_id = uuid::Uuid::new_v4().to_string();
repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let file_path = "data/test.csv";
let uri = format!(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/data_frames/download_streaming/{file_path}"
);
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/data_frames/download_streaming/{path:.*}",
web::get().to(controllers::workspaces::data_frames::download_streaming),
),
)
.await;
let req = actix_web::test::TestRequest::get().uri(&uri).to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::OK);
let bytes = actix_http::body::to_bytes(resp.into_body()).await.unwrap();
let response: WorkspaceJsonDataFrameViewResponse = serde_json::from_slice(&bytes)?;
assert!(!response.is_indexed);
assert!(response.data_frame.is_none());
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_download_streaming_unindexed_workspace_only_file_returns_200()
-> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Name";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let readme = repo.path.join("README.md");
util::fs::write_to_path(&readme, "# Test")?;
repositories::add(&repo, &readme).await?;
let commit = repositories::commit(&repo, "Initial commit")?;
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace = repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let workspace_csv_dir = workspace.dir().join("data");
util::fs::create_dir_all(&workspace_csv_dir)?;
let workspace_csv_path = workspace_csv_dir.join("new.csv");
util::fs::write_to_path(&workspace_csv_path, "x,y\n10,20\n")?;
repositories::workspaces::files::add(&workspace, &workspace_csv_path).await?;
let file_path = "data/new.csv";
let uri = format!(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/data_frames/download_streaming/{file_path}"
);
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/data_frames/download_streaming/{path:.*}",
web::get().to(controllers::workspaces::data_frames::download_streaming),
),
)
.await;
let req = actix_web::test::TestRequest::get().uri(&uri).to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::OK);
let bytes = actix_http::body::to_bytes(resp.into_body()).await.unwrap();
let response: WorkspaceJsonDataFrameViewResponse = serde_json::from_slice(&bytes)?;
assert!(!response.is_indexed);
assert!(response.data_frame.is_none());
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_download_streaming_nonexistent_path_returns_404() -> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Name";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let csv_dir = repo.path.join("data");
util::fs::create_dir_all(&csv_dir)?;
let csv_path = csv_dir.join("test.csv");
util::fs::write_to_path(&csv_path, "col_a,col_b\n1,2\n3,4\n")?;
repositories::add(&repo, &csv_path).await?;
let commit = repositories::commit(&repo, "Add CSV")?;
let workspace_id = uuid::Uuid::new_v4().to_string();
repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let nonexistent_path = "data/this_does_not_exist.csv";
let uri = format!(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/data_frames/download_streaming/{nonexistent_path}"
);
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/data_frames/download_streaming/{path:.*}",
web::get().to(controllers::workspaces::data_frames::download_streaming),
),
)
.await;
let req = actix_web::test::TestRequest::get().uri(&uri).to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::NOT_FOUND);
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
async fn get_data_frame_page(
sync_dir: &std::path::Path,
namespace: &str,
repo_name: &str,
workspace_id: &str,
file_path: &str,
page: usize,
page_size: usize,
) -> WorkspaceJsonDataFrameViewResponse {
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.to_path_buf()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/data_frames/resource/{path:.*}",
web::get().to(controllers::workspaces::data_frames::get),
),
)
.await;
let uri = format!(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/data_frames/resource/{file_path}?page={page}&page_size={page_size}"
);
let req = actix_web::test::TestRequest::get().uri(&uri).to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::OK);
let bytes = actix_http::body::to_bytes(resp.into_body())
.await
.expect("could not read response body");
serde_json::from_slice(&bytes).expect("could not deserialize data frame response")
}
fn page_row_count(response: &WorkspaceJsonDataFrameViewResponse) -> usize {
let data_frame = response
.data_frame
.as_ref()
.expect("expected a data frame in the response");
let array_len = data_frame
.view
.data
.as_array()
.expect("expected the view data to be a JSON array")
.len();
assert_eq!(array_len, data_frame.view.size.height);
array_len
}
#[actix_web::test]
async fn test_get_unindexed_data_frame_paginates() -> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Name";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let csv_dir = repo.path.join("data");
util::fs::create_dir_all(&csv_dir)?;
let csv_path = csv_dir.join("history.csv");
let mut csv = String::from("id,label\n");
for i in 0..25 {
csv.push_str(&format!("{i},label_{i}\n"));
}
util::fs::write_to_path(&csv_path, &csv)?;
repositories::add(&repo, &csv_path).await?;
let commit = repositories::commit(&repo, "Add 25-row CSV")?;
let file_path = "data/history.csv";
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace = repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let page_1 = get_data_frame_page(
&sync_dir,
namespace,
repo_name,
&workspace_id,
file_path,
1,
10,
)
.await;
assert!(!page_1.is_indexed);
assert_eq!(page_row_count(&page_1), 10);
let pagination = &page_1.data_frame.as_ref().unwrap().view.pagination;
assert_eq!(pagination.total_pages, 3);
assert_eq!(pagination.total_entries, 25);
let page_3 = get_data_frame_page(
&sync_dir,
namespace,
repo_name,
&workspace_id,
file_path,
3,
10,
)
.await;
assert_eq!(page_row_count(&page_3), 5);
let page_4 = get_data_frame_page(
&sync_dir,
namespace,
repo_name,
&workspace_id,
file_path,
4,
10,
)
.await;
assert_eq!(page_row_count(&page_4), 0);
let pagination = &page_4.data_frame.as_ref().unwrap().view.pagination;
assert_eq!(pagination.total_pages, 3);
assert_eq!(pagination.total_entries, 25);
let page_zero = get_data_frame_page(
&sync_dir,
namespace,
repo_name,
&workspace_id,
file_path,
0,
10,
)
.await;
assert_eq!(page_row_count(&page_zero), 10);
let zero_page_size = get_data_frame_page(
&sync_dir,
namespace,
repo_name,
&workspace_id,
file_path,
1,
0,
)
.await;
assert_eq!(page_row_count(&zero_page_size), 1);
assert_eq!(
zero_page_size
.data_frame
.as_ref()
.unwrap()
.view
.pagination
.total_entries,
25
);
repositories::workspaces::data_frames::index(
&repo,
&workspace,
std::path::Path::new(file_path),
)
.await?;
for (page, expected_rows) in [(1, 10), (3, 5), (4, 0)] {
let indexed_page = get_data_frame_page(
&sync_dir,
namespace,
repo_name,
&workspace_id,
file_path,
page,
10,
)
.await;
assert!(indexed_page.is_indexed);
assert_eq!(page_row_count(&indexed_page), expected_rows);
let pagination = &indexed_page.data_frame.as_ref().unwrap().view.pagination;
assert_eq!(pagination.total_pages, 3);
assert_eq!(pagination.total_entries, 25);
}
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
}