use std::sync::Arc;
use ahash::AHashMap;
use arrow::record_batch::RecordBatch;
use object_store::{ObjectStore, ObjectStoreExt, path::Path as ObjectPath};
use parquet::{
arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
file::{
metadata::KeyValue,
properties::WriterProperties,
reader::{FileReader, SerializedFileReader},
statistics::Statistics,
},
};
pub async fn write_batch_to_parquet(
batch: RecordBatch,
path: &str,
storage_options: Option<AHashMap<String, String>>,
compression: Option<parquet::basic::Compression>,
max_row_group_size: Option<usize>,
) -> anyhow::Result<()> {
write_batches_to_parquet(
&[batch],
path,
storage_options,
compression,
max_row_group_size,
)
.await
}
pub async fn write_batches_to_parquet(
batches: &[RecordBatch],
path: &str,
storage_options: Option<AHashMap<String, String>>,
compression: Option<parquet::basic::Compression>,
max_row_group_size: Option<usize>,
) -> anyhow::Result<()> {
let (object_store, base_path, _) = create_object_store_from_path(path, storage_options)?;
let object_path = if base_path.is_empty() {
ObjectPath::from(path)
} else {
ObjectPath::from(format!("{base_path}/{path}"))
};
write_batches_to_object_store(
batches,
object_store,
&object_path,
compression,
max_row_group_size,
None,
)
.await
}
pub async fn read_parquet_from_object_store(
object_store: Arc<dyn ObjectStore>,
path: &ObjectPath,
) -> anyhow::Result<(Vec<RecordBatch>, Arc<arrow::datatypes::Schema>)> {
let result: object_store::GetResult = object_store.get(path).await?;
let data = result.bytes().await?;
if data.is_empty() {
return Ok((
Vec::new(),
Arc::new(arrow::datatypes::Schema::new(
Vec::<arrow::datatypes::Field>::new(),
)),
));
}
let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
let schema = builder.schema().clone();
let reader = builder.build()?;
let mut batches = Vec::new();
for batch in reader {
batches.push(batch?);
}
Ok((batches, schema))
}
pub async fn write_batches_to_object_store(
batches: &[RecordBatch],
object_store: Arc<dyn ObjectStore>,
path: &ObjectPath,
compression: Option<parquet::basic::Compression>,
max_row_group_size: Option<usize>,
key_value_metadata: Option<Vec<KeyValue>>,
) -> anyhow::Result<()> {
let mut buffer = Vec::new();
let mut props_builder = WriterProperties::builder()
.set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
.set_max_row_group_row_count(Some(max_row_group_size.unwrap_or(5000)));
if let Some(kv) = key_value_metadata {
props_builder = props_builder.set_key_value_metadata(Some(kv));
}
let writer_props = props_builder.build();
let mut writer = ArrowWriter::try_new(&mut buffer, batches[0].schema(), Some(writer_props))?;
for batch in batches {
writer.write(batch)?;
}
writer.close()?;
object_store.put(path, buffer.into()).await?;
Ok(())
}
fn deduplicate_record_batches(batches: &[RecordBatch]) -> anyhow::Result<Vec<RecordBatch>> {
if batches.is_empty() {
return Ok(Vec::new());
}
let schema = batches[0].schema();
let fields: Vec<arrow_row::SortField> = schema
.fields()
.iter()
.map(|f| arrow_row::SortField::new(f.data_type().clone()))
.collect();
let converter = arrow_row::RowConverter::new(fields)?;
let mut seen: std::collections::HashSet<Vec<u8>> = std::collections::HashSet::new();
let mut result: Vec<RecordBatch> = Vec::new();
for batch in batches {
let rows = converter.convert_columns(batch.columns())?;
let mut indices: Vec<u32> = Vec::new();
for (i, row) in rows.iter().enumerate() {
if seen.insert(row.as_ref().to_vec()) {
indices.push(i as u32);
}
}
if !indices.is_empty() {
let index_array = arrow::array::UInt32Array::from(indices);
let deduped_columns: Vec<arrow::array::ArrayRef> = batch
.columns()
.iter()
.map(|col| arrow::compute::take(col.as_ref(), &index_array, None))
.collect::<Result<_, _>>()?;
result.push(RecordBatch::try_new(schema.clone(), deduped_columns)?);
}
}
Ok(result)
}
pub async fn combine_parquet_files(
file_paths: Vec<&str>,
new_file_path: &str,
storage_options: Option<AHashMap<String, String>>,
compression: Option<parquet::basic::Compression>,
max_row_group_size: Option<usize>,
deduplicate: Option<bool>,
) -> anyhow::Result<()> {
if file_paths.len() <= 1 {
return Ok(());
}
let (object_store, base_path, _) =
create_object_store_from_path(file_paths[0], storage_options)?;
let object_paths: Vec<ObjectPath> = file_paths
.iter()
.map(|path| {
if base_path.is_empty() {
ObjectPath::from(*path)
} else {
ObjectPath::from(format!("{base_path}/{path}"))
}
})
.collect();
let new_object_path = if base_path.is_empty() {
ObjectPath::from(new_file_path)
} else {
ObjectPath::from(format!("{base_path}/{new_file_path}"))
};
combine_parquet_files_from_object_store(
object_store,
object_paths,
&new_object_path,
compression,
max_row_group_size,
deduplicate,
)
.await
}
pub async fn combine_parquet_files_from_object_store(
object_store: Arc<dyn ObjectStore>,
file_paths: Vec<ObjectPath>,
new_file_path: &ObjectPath,
compression: Option<parquet::basic::Compression>,
max_row_group_size: Option<usize>,
deduplicate: Option<bool>,
) -> anyhow::Result<()> {
if file_paths.len() <= 1 {
return Ok(());
}
let mut all_batches: Vec<RecordBatch> = Vec::new();
let mut schema_with_metadata: Option<Arc<arrow::datatypes::Schema>> = None;
for path in &file_paths {
let result: object_store::GetResult = object_store.get(path).await?;
let data = result.bytes().await?;
let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
if schema_with_metadata.is_none() {
schema_with_metadata = Some(builder.schema().clone());
}
let mut reader = builder.build()?;
for batch in reader.by_ref() {
all_batches.push(batch?);
}
}
if let Some(schema) = &schema_with_metadata {
all_batches = all_batches
.into_iter()
.map(|b| {
RecordBatch::try_new(schema.clone(), b.columns().to_vec())
.expect("schema re-application failed")
})
.collect();
}
let batches_to_write = if deduplicate.unwrap_or(false) {
deduplicate_record_batches(&all_batches)?
} else {
all_batches
};
write_batches_to_object_store(
&batches_to_write,
object_store.clone(),
new_file_path,
compression,
max_row_group_size,
None,
)
.await?;
for path in &file_paths {
if path != new_file_path {
object_store.delete(path).await?;
}
}
Ok(())
}
pub async fn min_max_from_parquet_metadata(
file_path: &str,
storage_options: Option<AHashMap<String, String>>,
column_name: &str,
) -> anyhow::Result<(u64, u64)> {
let (object_store, base_path, _) = create_object_store_from_path(file_path, storage_options)?;
let object_path = if base_path.is_empty() {
ObjectPath::from(file_path)
} else {
ObjectPath::from(format!("{base_path}/{file_path}"))
};
min_max_from_parquet_metadata_object_store(object_store, &object_path, column_name).await
}
pub async fn min_max_from_parquet_metadata_object_store(
object_store: Arc<dyn ObjectStore>,
file_path: &ObjectPath,
column_name: &str,
) -> anyhow::Result<(u64, u64)> {
let result: object_store::GetResult = object_store.get(file_path).await?;
let data = result.bytes().await?;
let reader = SerializedFileReader::new(data)?;
let metadata = reader.metadata();
let mut overall_min_value: Option<i64> = None;
let mut overall_max_value: Option<i64> = None;
for i in 0..metadata.num_row_groups() {
let row_group = metadata.row_group(i);
for j in 0..row_group.num_columns() {
let col_metadata = row_group.column(j);
if col_metadata.column_path().string() == column_name {
if let Some(stats) = col_metadata.statistics() {
if let Statistics::Int64(int64_stats) = stats {
if let Some(&min_value) = int64_stats.min_opt()
&& (overall_min_value.is_none()
|| min_value < overall_min_value.unwrap())
{
overall_min_value = Some(min_value);
}
if let Some(&max_value) = int64_stats.max_opt()
&& (overall_max_value.is_none()
|| max_value > overall_max_value.unwrap())
{
overall_max_value = Some(max_value);
}
} else {
anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
}
} else {
anyhow::bail!(
"Warning: Statistics not available for column '{column_name}' in row group {i}."
);
}
}
}
}
if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
Ok((min as u64, max as u64))
} else {
anyhow::bail!(
"Column '{column_name}' not found or has no Int64 statistics in any row group."
)
}
}
#[allow(unused_variables, clippy::needless_pass_by_value)]
pub fn create_object_store_from_path(
path: &str,
storage_options: Option<AHashMap<String, String>>,
) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
let uri = normalize_path_to_uri(path);
match uri.as_str() {
#[cfg(feature = "cloud")]
s if s.starts_with("s3://") => create_s3_store(&uri, storage_options),
#[cfg(feature = "cloud")]
s if s.starts_with("gs://") || s.starts_with("gcs://") => {
create_gcs_store(&uri, storage_options)
}
#[cfg(feature = "cloud")]
s if s.starts_with("az://") => create_azure_store(&uri, storage_options),
#[cfg(feature = "cloud")]
s if s.starts_with("abfs://") => create_abfs_store(&uri, storage_options),
#[cfg(feature = "cloud")]
s if s.starts_with("http://") || s.starts_with("https://") => {
create_http_store(&uri, storage_options)
}
#[cfg(not(feature = "cloud"))]
s if s.starts_with("s3://")
|| s.starts_with("gs://")
|| s.starts_with("gcs://")
|| s.starts_with("az://")
|| s.starts_with("abfs://")
|| s.starts_with("http://")
|| s.starts_with("https://") =>
{
anyhow::bail!("Cloud storage support requires the 'cloud' feature: {uri}")
}
s if s.starts_with("file://") => create_local_store(&uri, true),
_ => create_local_store(&uri, false), }
}
#[must_use]
pub fn normalize_path_to_uri(path: &str) -> String {
if path.contains("://") {
path.to_string()
} else {
if is_absolute_path(path) {
path_to_file_uri(path)
} else {
let absolute_path = std::env::current_dir().unwrap().join(path);
path_to_file_uri(&absolute_path.to_string_lossy())
}
}
}
#[must_use]
fn is_absolute_path(path: &str) -> bool {
if path.starts_with('/') {
true
} else if path.len() >= 3
&& path.chars().nth(1) == Some(':')
&& path.chars().nth(2) == Some('\\')
{
true
} else if path.len() >= 3
&& path.chars().nth(1) == Some(':')
&& path.chars().nth(2) == Some('/')
{
true
} else if path.starts_with("\\\\") {
true
} else {
false
}
}
#[must_use]
fn path_to_file_uri(path: &str) -> String {
if path.starts_with('/') {
format!("file://{path}")
} else if path.len() >= 3 && path.chars().nth(1) == Some(':') {
let normalized = path.replace('\\', "/");
format!("file:///{normalized}")
} else if let Some(without_prefix) = path.strip_prefix("\\\\") {
let normalized = without_prefix.replace('\\', "/");
format!("file://{normalized}")
} else {
format!("file://{path}")
}
}
#[cfg(windows)]
pub(crate) fn file_uri_to_native_path(uri: &str) -> String {
let without_scheme = uri
.strip_prefix("file://")
.or_else(|| uri.strip_prefix("file:"))
.unwrap_or(uri);
let without_leading = without_scheme.trim_start_matches('/');
without_leading.replace('/', "\\")
}
#[cfg(not(windows))]
pub(crate) fn file_uri_to_native_path(uri: &str) -> String {
uri.strip_prefix("file://").unwrap_or(uri).to_string()
}
fn create_local_store(
uri: &str,
is_file_uri: bool,
) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
let path = if is_file_uri {
file_uri_to_native_path(uri)
} else {
uri.to_string()
};
let local_store = object_store::local::LocalFileSystem::new_with_prefix(&path)?;
Ok((Arc::new(local_store), String::new(), uri.to_string()))
}
#[cfg(feature = "cloud")]
fn create_s3_store(
uri: &str,
storage_options: Option<AHashMap<String, String>>,
) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
let (url, path) = parse_url_and_path(uri)?;
let bucket = extract_host(&url, "Invalid S3 URI: missing bucket")?;
let mut builder = object_store::aws::AmazonS3Builder::new().with_bucket_name(&bucket);
if let Some(options) = storage_options {
for (key, value) in options {
match key.as_str() {
"endpoint_url" => {
builder = builder.with_endpoint(&value);
}
"region" => {
builder = builder.with_region(&value);
}
"access_key_id" | "key" => {
builder = builder.with_access_key_id(&value);
}
"secret_access_key" | "secret" => {
builder = builder.with_secret_access_key(&value);
}
"session_token" | "token" => {
builder = builder.with_token(&value);
}
"allow_http" => {
let allow_http = value.to_lowercase() == "true";
builder = builder.with_allow_http(allow_http);
}
_ => {
log::warn!("Unknown S3 storage option: {key}");
}
}
}
}
let s3_store = builder.build()?;
Ok((Arc::new(s3_store), path, uri.to_string()))
}
#[cfg(feature = "cloud")]
fn create_gcs_store(
uri: &str,
storage_options: Option<AHashMap<String, String>>,
) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
let (url, path) = parse_url_and_path(uri)?;
let bucket = extract_host(&url, "Invalid GCS URI: missing bucket")?;
let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new().with_bucket_name(&bucket);
if let Some(options) = storage_options {
for (key, value) in options {
match key.as_str() {
"service_account_path" => {
builder = builder.with_service_account_path(&value);
}
"service_account_key" => {
builder = builder.with_service_account_key(&value);
}
"project_id" => {
log::warn!(
"project_id should be set via service account or environment variables"
);
}
"application_credentials" => {
unsafe {
std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", &value);
}
}
_ => {
log::warn!("Unknown GCS storage option: {key}");
}
}
}
}
let gcs_store = builder.build()?;
Ok((Arc::new(gcs_store), path, uri.to_string()))
}
#[cfg(feature = "cloud")]
fn create_azure_store(
uri: &str,
storage_options: Option<AHashMap<String, String>>,
) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
let (url, _) = parse_url_and_path(uri)?;
let container = extract_host(&url, "Invalid Azure URI: missing container")?;
let path = url.path().trim_start_matches('/').to_string();
let mut builder =
object_store::azure::MicrosoftAzureBuilder::new().with_container_name(container);
if let Some(options) = storage_options {
for (key, value) in options {
match key.as_str() {
"account_name" => {
builder = builder.with_account(&value);
}
"account_key" => {
builder = builder.with_access_key(&value);
}
"sas_token" => {
let query_pairs: Vec<(String, String)> = value
.split('&')
.filter_map(|pair| {
let mut parts = pair.split('=');
match (parts.next(), parts.next()) {
(Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
_ => None,
}
})
.collect();
builder = builder.with_sas_authorization(query_pairs);
}
"client_id" => {
builder = builder.with_client_id(&value);
}
"client_secret" => {
builder = builder.with_client_secret(&value);
}
"tenant_id" => {
builder = builder.with_tenant_id(&value);
}
_ => {
log::warn!("Unknown Azure storage option: {key}");
}
}
}
}
let azure_store = builder.build()?;
Ok((Arc::new(azure_store), path, uri.to_string()))
}
#[cfg(feature = "cloud")]
fn create_abfs_store(
uri: &str,
storage_options: Option<AHashMap<String, String>>,
) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
let (url, path) = parse_url_and_path(uri)?;
let host = extract_host(&url, "Invalid ABFS URI: missing host")?;
let account = host
.split('.')
.next()
.ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: cannot extract account from host"))?;
let container = url
.username()
.split('@')
.next()
.ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: missing container"))?;
let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
.with_account(account)
.with_container_name(container);
if let Some(options) = storage_options {
for (key, value) in options {
match key.as_str() {
"account_name" => {
builder = builder.with_account(&value);
}
"account_key" => {
builder = builder.with_access_key(&value);
}
"sas_token" => {
let query_pairs: Vec<(String, String)> = value
.split('&')
.filter_map(|pair| {
let mut parts = pair.split('=');
match (parts.next(), parts.next()) {
(Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
_ => None,
}
})
.collect();
builder = builder.with_sas_authorization(query_pairs);
}
"client_id" => {
builder = builder.with_client_id(&value);
}
"client_secret" => {
builder = builder.with_client_secret(&value);
}
"tenant_id" => {
builder = builder.with_tenant_id(&value);
}
_ => {
log::warn!("Unknown ABFS storage option: {key}");
}
}
}
}
let azure_store = builder.build()?;
Ok((Arc::new(azure_store), path, uri.to_string()))
}
#[cfg(feature = "cloud")]
fn create_http_store(
uri: &str,
storage_options: Option<AHashMap<String, String>>,
) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
let (url, path) = parse_url_and_path(uri)?;
let base_url = format!("{}://{}", url.scheme(), url.host_str().unwrap_or(""));
let builder = object_store::http::HttpBuilder::new().with_url(base_url);
if let Some(options) = storage_options {
for (key, _value) in options {
log::warn!("Unknown HTTP storage option: {key}");
}
}
let http_store = builder.build()?;
Ok((Arc::new(http_store), path, uri.to_string()))
}
#[cfg(feature = "cloud")]
fn parse_url_and_path(uri: &str) -> anyhow::Result<(url::Url, String)> {
let url = url::Url::parse(uri)?;
let path = url.path().trim_start_matches('/').to_string();
Ok((url, path))
}
#[cfg(feature = "cloud")]
fn extract_host(url: &url::Url, error_msg: &str) -> anyhow::Result<String> {
url.host_str()
.map(ToString::to_string)
.ok_or_else(|| anyhow::anyhow!("{error_msg}"))
}
#[cfg(test)]
mod tests {
#[cfg(feature = "cloud")]
use ahash::AHashMap;
use rstest::rstest;
use super::*;
#[rstest]
fn test_create_object_store_from_path_local() {
let temp_dir = std::env::temp_dir().join("nautilus_test");
std::fs::create_dir_all(&temp_dir).unwrap();
let result = create_object_store_from_path(temp_dir.to_str().unwrap(), None);
if let Err(e) = &result {
println!("Error: {e:?}");
}
assert!(result.is_ok());
let (_, base_path, uri) = result.unwrap();
assert_eq!(base_path, "");
assert_eq!(uri, format!("file://{}", temp_dir.to_str().unwrap()));
std::fs::remove_dir_all(&temp_dir).ok();
}
#[rstest]
#[cfg(feature = "cloud")]
fn test_create_object_store_from_path_s3() {
let mut options = AHashMap::new();
options.insert(
"endpoint_url".to_string(),
"https://test.endpoint.com".to_string(),
);
options.insert("region".to_string(), "us-west-2".to_string());
options.insert("access_key_id".to_string(), "test_key".to_string());
options.insert("secret_access_key".to_string(), "test_secret".to_string());
let result = create_object_store_from_path("s3://test-bucket/path", Some(options));
assert!(result.is_ok());
let (_, base_path, uri) = result.unwrap();
assert_eq!(base_path, "path");
assert_eq!(uri, "s3://test-bucket/path");
}
#[rstest]
#[cfg(feature = "cloud")]
fn test_create_object_store_from_path_azure() {
let mut options = AHashMap::new();
options.insert("account_name".to_string(), "testaccount".to_string());
options.insert("account_key".to_string(), "dGVzdGtleQ==".to_string());
let result = create_object_store_from_path("az://container/path", Some(options));
if let Err(e) = &result {
println!("Azure Error: {e:?}");
}
assert!(result.is_ok());
let (_, base_path, uri) = result.unwrap();
assert_eq!(base_path, "path");
assert_eq!(uri, "az://container/path");
}
#[rstest]
#[cfg(feature = "cloud")]
fn test_create_object_store_from_path_gcs() {
let mut options = AHashMap::new();
options.insert("project_id".to_string(), "test-project".to_string());
let result = create_object_store_from_path("gs://test-bucket/path", Some(options));
match result {
Ok((_, base_path, uri)) => {
assert_eq!(base_path, "path");
assert_eq!(uri, "gs://test-bucket/path");
}
Err(e) => {
let error_msg = format!("{e:?}");
assert!(error_msg.contains("test-bucket") || error_msg.contains("credential"));
}
}
}
#[rstest]
#[cfg(feature = "cloud")]
fn test_create_object_store_from_path_empty_options() {
let result = create_object_store_from_path("s3://test-bucket/path", None);
assert!(result.is_ok());
let (_, base_path, uri) = result.unwrap();
assert_eq!(base_path, "path");
assert_eq!(uri, "s3://test-bucket/path");
}
#[rstest]
#[cfg(feature = "cloud")]
fn test_parse_url_and_path() {
let result = parse_url_and_path("s3://bucket/path/to/file");
assert!(result.is_ok());
let (url, path) = result.unwrap();
assert_eq!(url.scheme(), "s3");
assert_eq!(url.host_str().unwrap(), "bucket");
assert_eq!(path, "path/to/file");
}
#[rstest]
#[cfg(feature = "cloud")]
fn test_extract_host() {
let url = url::Url::parse("s3://test-bucket/path").unwrap();
let result = extract_host(&url, "Test error");
assert!(result.is_ok());
assert_eq!(result.unwrap(), "test-bucket");
}
#[rstest]
fn test_normalize_path_to_uri() {
assert_eq!(normalize_path_to_uri("/tmp/test"), "file:///tmp/test");
assert_eq!(
normalize_path_to_uri("C:\\tmp\\test"),
"file:///C:/tmp/test"
);
assert_eq!(normalize_path_to_uri("C:/tmp/test"), "file:///C:/tmp/test");
assert_eq!(
normalize_path_to_uri("D:\\data\\file.txt"),
"file:///D:/data/file.txt"
);
assert_eq!(
normalize_path_to_uri("\\\\server\\share\\file"),
"file://server/share/file"
);
assert_eq!(
normalize_path_to_uri("s3://bucket/path"),
"s3://bucket/path"
);
assert_eq!(
normalize_path_to_uri("file:///tmp/test"),
"file:///tmp/test"
);
assert_eq!(
normalize_path_to_uri("https://example.com/path"),
"https://example.com/path"
);
}
#[rstest]
fn test_is_absolute_path() {
assert!(is_absolute_path("/tmp/test"));
assert!(is_absolute_path("/"));
assert!(is_absolute_path("C:\\tmp\\test"));
assert!(is_absolute_path("C:/tmp/test"));
assert!(is_absolute_path("D:\\"));
assert!(is_absolute_path("Z:/"));
assert!(is_absolute_path("\\\\server\\share"));
assert!(is_absolute_path("\\\\localhost\\c$"));
assert!(!is_absolute_path("tmp/test"));
assert!(!is_absolute_path("./test"));
assert!(!is_absolute_path("../test"));
assert!(!is_absolute_path("test.txt"));
assert!(!is_absolute_path(""));
assert!(!is_absolute_path("C"));
assert!(!is_absolute_path("C:"));
assert!(!is_absolute_path("\\"));
}
#[rstest]
fn test_path_to_file_uri() {
assert_eq!(path_to_file_uri("/tmp/test"), "file:///tmp/test");
assert_eq!(path_to_file_uri("/"), "file:///");
assert_eq!(path_to_file_uri("C:\\tmp\\test"), "file:///C:/tmp/test");
assert_eq!(path_to_file_uri("C:/tmp/test"), "file:///C:/tmp/test");
assert_eq!(path_to_file_uri("D:\\"), "file:///D:/");
assert_eq!(
path_to_file_uri("\\\\server\\share\\file"),
"file://server/share/file"
);
assert_eq!(
path_to_file_uri("\\\\localhost\\c$\\test"),
"file://localhost/c$/test"
);
}
}