use bytes::Bytes;
use parking_lot::RwLock;
use reqwest::multipart::{Form, Part};
use std::sync::Arc;
use super::auth::{TokenManager, BASE_URL};
use super::entity;
use super::types::*;
use crate::error::{AppError, Result};
use sea_orm::{
entity::*,
query::*,
sea_query::{Expr, Index},
*,
};
macro_rules! retry_api {
($self:expr, $request_maker:expr) => {{
const MAX_RETRIES: usize = 3;
const RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(1);
let mut final_response = None;
for attempt in 0..=MAX_RETRIES {
let token = $self.token_manager.get_token().await?;
let response = $request_maker(&token).await?;
let text = response.text().await?;
let api_response: ApiResponse<_> = match serde_json::from_str(&text) {
Ok(v) => v,
Err(e) => {
return Err(AppError::Pan123Api {
code: -1,
message: format!("Failed to parse response JSON: {}", e)
});
}
};
if api_response.code == 429 {
if attempt < MAX_RETRIES {
tracing::warn!(
"Rate limited (429), waiting {}s before retry (attempt {}/{})",
RETRY_DELAY.as_secs(),
attempt + 1,
MAX_RETRIES
);
tokio::time::sleep(RETRY_DELAY).await;
continue;
} else {
tracing::error!(
"Rate limited (429) after {} retries, giving up",
MAX_RETRIES
);
return Err(AppError::Pan123Api {
code: api_response.code,
message: api_response.message,
});
}
}
if api_response.code == 401 {
if attempt < MAX_RETRIES {
tracing::warn!("Token expired (401), refreshing token and retrying (attempt {}/{})", attempt + 1, MAX_RETRIES);
if let Err(e) = $self.token_manager.refresh_token().await {
tracing::error!("Failed to refresh token on 401: {}", e);
}
continue;
}
}
final_response = Some(api_response);
break;
}
final_response.expect("Retry loop should always return a result")
}};
}
#[derive(Clone)]
pub struct Pan123Client {
token_manager: TokenManager,
repo_path: String,
pub(crate) db: DatabaseConnection,
upload_domain: Arc<RwLock<Option<String>>>,
}
impl Pan123Client {
pub async fn new(
client_id: String,
client_secret: String,
repo_path: String,
database_url: &str,
) -> Result<Self> {
let mut opt = ConnectOptions::new(database_url.to_owned());
opt.sqlx_logging_level(log::LevelFilter::Debug);
let db = Database::connect(opt)
.await
.map_err(|e| AppError::Internal(format!("Failed to connect to database: {}", e)))?;
db.execute(sea_orm::Statement::from_string(
sea_orm::DatabaseBackend::Sqlite,
"PRAGMA journal_mode=WAL;
PRAGMA synchronous=NORMAL;
PRAGMA cache_size=-256000; -- 256MB negative means pages
PRAGMA temp_store=MEMORY;
PRAGMA mmap_size=30000000000;",
))
.await
.map_err(|e| AppError::Internal(format!("Failed to set SQLite pragmas: {}", e)))?;
let client = Self {
token_manager: TokenManager::new(client_id, client_secret, db.clone()),
repo_path,
db,
upload_domain: Arc::new(RwLock::new(None)),
};
client.init_db().await?;
client.token_manager.init_db().await?;
Ok(client)
}
async fn init_db(&self) -> Result<()> {
let builder = self.db.get_database_backend();
let schema = Schema::new(builder);
let stmt = schema
.create_table_from_entity(entity::Entity)
.if_not_exists()
.to_owned();
self.db
.execute(builder.build(&stmt))
.await
.map_err(|e| AppError::Internal(format!("Failed to initialize database: {}", e)))?;
let index_stmt = Index::create()
.name("idx_parent_name")
.table(entity::Entity)
.col(entity::Column::ParentId)
.col(entity::Column::Name)
.unique()
.if_not_exists()
.to_owned();
self.db
.execute(builder.build(&index_stmt))
.await
.map_err(|e| AppError::Internal(format!("Failed to create index: {}", e)))?;
Ok(())
}
async fn get<T: serde::de::DeserializeOwned>(&self, url: &str) -> Result<ApiResponse<T>> {
Ok(retry_api!(self, |token| {
self.token_manager
.http_client()
.get(url)
.header("Authorization", format!("Bearer {}", token))
.header("Platform", "open_platform")
.send()
}))
}
async fn get_no_timeout<T: serde::de::DeserializeOwned>(
&self,
url: &str,
) -> Result<ApiResponse<T>> {
Ok(retry_api!(self, |token| {
self.token_manager
.http_client()
.get(url)
.timeout(std::time::Duration::MAX) .header("Authorization", format!("Bearer {}", token))
.header("Platform", "open_platform")
.send()
}))
}
async fn post<T: serde::de::DeserializeOwned, B: serde::Serialize>(
&self,
url: &str,
body: &B,
) -> Result<ApiResponse<T>> {
let body_json = serde_json::to_string(body)?;
Ok(retry_api!(self, |token| {
self.token_manager
.http_client()
.post(url)
.header("Authorization", format!("Bearer {}", token))
.header("Platform", "open_platform")
.header("Content-Type", "application/json")
.body(body_json.clone())
.send()
}))
}
async fn get_upload_domain(&self) -> Result<String> {
{
let cache = self.upload_domain.read();
if let Some(domain) = cache.as_ref() {
return Ok(domain.clone());
}
}
let url = format!("{}/upload/v2/file/domain", BASE_URL);
let api_response: ApiResponse<Vec<String>> = retry_api!(self, |token| {
self.token_manager
.http_client()
.get(&url)
.header("Authorization", format!("Bearer {}", token))
.header("Platform", "open_platform")
.send()
});
if !api_response.is_success() {
return Err(AppError::Pan123Api {
code: api_response.code,
message: api_response.message,
});
}
let domains = api_response
.data
.ok_or_else(|| AppError::Internal("No upload domain in response".to_string()))?;
let domain = domains
.into_iter()
.next()
.ok_or_else(|| AppError::Internal("Empty upload domain list".to_string()))?;
tracing::info!("Fetched upload domain: {}", domain);
{
let mut cache = self.upload_domain.write();
*cache = Some(domain.clone());
}
Ok(domain)
}
pub async fn list_files(&self, parent_id: i64) -> Result<Vec<FileInfo>> {
let nodes = entity::Entity::find()
.filter(entity::Column::ParentId.eq(parent_id))
.all(&self.db)
.await
.map_err(|e| AppError::Internal(format!("DB error in list_files: {}", e)))?;
Ok(nodes
.into_iter()
.map(|n| FileInfo {
file_id: n.file_id,
filename: n.name,
file_type: if n.is_dir { 1 } else { 0 },
size: n.size,
parent_file_id: n.parent_id,
trashed: 0,
})
.collect())
}
async fn fetch_files_from_api(&self, parent_id: i64) -> Result<Vec<FileInfo>> {
let mut all_files = Vec::new();
let mut last_file_id: Option<i64> = None;
let mut page_count = 0;
loop {
let mut url = format!(
"{}/api/v2/file/list?parentFileId={}&limit=100",
BASE_URL, parent_id
);
if let Some(id) = last_file_id {
url.push_str(&format!("&lastFileId={}", id));
}
let response: ApiResponse<FileListData> = self.get_no_timeout(&url).await?;
if !response.is_success() {
return Err(AppError::Pan123Api {
code: response.code,
message: response.message,
});
}
if let Some(data) = response.data {
let files: Vec<_> = data
.file_list
.into_iter()
.filter(|f| !f.is_trashed())
.collect();
all_files.extend(files);
page_count += 1;
if page_count % 100 == 0 {
tracing::info!(
"Fetched {} files so far (parent_id={})",
all_files.len(),
parent_id
);
}
if data.last_file_id == -1 {
break;
}
last_file_id = Some(data.last_file_id);
} else {
break;
}
}
Ok(all_files)
}
pub fn invalidate_files_cache(&self, _parent_id: i64) {
}
pub async fn find_file(&self, parent_id: i64, name: &str) -> Result<Option<FileInfo>> {
let files = self.list_files(parent_id).await?;
Ok(files.into_iter().find(|f| f.filename == name))
}
async fn create_directory(&self, parent_id: i64, name: &str) -> Result<i64> {
tracing::debug!("Creating directory '{}' in parent {}", name, parent_id);
let request = CreateDirRequest {
name: name.to_string(),
parent_id,
};
let url = format!("{}/upload/v1/file/mkdir", BASE_URL);
let response: ApiResponse<CreateDirData> = self.post(&url, &request).await?;
if !response.is_success() {
tracing::debug!(
"mkdir failed: code={}, message='{}'",
response.code,
response.message
);
if response.code == 1 {
if let Some(existing) = self.find_file(parent_id, name).await? {
if existing.is_folder() {
let cached = entity::Entity::find()
.filter(entity::Column::ParentId.eq(parent_id))
.filter(entity::Column::Name.eq(name.to_string()))
.filter(entity::Column::IsDir.eq(true))
.one(&self.db)
.await
.map_err(|e| {
AppError::Internal(format!(
"DB error checking existing directory: {}",
e
))
})?;
if cached.is_none() {
let existing_dir = entity::ActiveModel {
file_id: Set(existing.file_id),
parent_id: Set(parent_id),
name: Set(name.to_string()),
is_dir: Set(true),
size: Set(existing.size),
etag: Set(None),
updated_at: Set(chrono::Utc::now().naive_utc()),
};
existing_dir.insert(&self.db).await.map_err(|e| {
AppError::Internal(format!(
"Failed to insert existing directory into DB: {}",
e
))
})?;
}
tracing::info!(
"Directory '{}' already exists with id {}",
name,
existing.file_id
);
return Ok(existing.file_id);
}
}
let files = self.fetch_files_from_api(parent_id).await?;
for f in &files {
entity::Entity::insert(entity::ActiveModel {
file_id: Set(f.file_id),
parent_id: Set(parent_id),
name: Set(f.filename.clone()),
is_dir: Set(f.is_folder()),
size: Set(f.size),
etag: Set(None),
updated_at: Set(chrono::Utc::now().naive_utc()),
})
.on_conflict(
sea_orm::sea_query::OnConflict::column(entity::Column::FileId)
.do_nothing()
.to_owned(),
)
.exec(&self.db)
.await
.map_err(|e| {
AppError::Internal(format!(
"Failed to refresh directory cache in mkdir fallback: {}",
e
))
})?;
}
if let Some(existing) = files
.into_iter()
.find(|f| f.filename == name && f.is_folder())
{
tracing::info!(
"Directory '{}' already exists with id {} (refreshed)",
name,
existing.file_id
);
return Ok(existing.file_id);
}
}
return Err(AppError::Pan123Api {
code: response.code,
message: response.message,
});
}
let data = response
.data
.ok_or_else(|| AppError::Internal("No data in mkdir response".to_string()))?;
let new_dir = entity::ActiveModel {
file_id: Set(data.dir_id),
parent_id: Set(parent_id),
name: Set(name.to_string()),
is_dir: Set(true),
size: Set(0),
etag: Set(None),
updated_at: Set(chrono::Utc::now().naive_utc()),
};
new_dir.insert(&self.db).await.map_err(|e| {
AppError::Internal(format!("Failed to insert new directory into DB: {}", e))
})?;
tracing::info!("Created directory '{}' with id {}", name, data.dir_id);
Ok(data.dir_id)
}
pub async fn find_path_id(&self, path: &str) -> Result<Option<i64>> {
let parts: Vec<&str> = path
.trim_start_matches('/')
.trim_end_matches('/')
.split('/')
.filter(|s| !s.is_empty())
.collect();
let mut current_id: i64 = 0;
for part in parts {
let node = entity::Entity::find()
.filter(entity::Column::ParentId.eq(current_id))
.filter(entity::Column::Name.eq(part.to_string()))
.filter(entity::Column::IsDir.eq(true))
.one(&self.db)
.await
.map_err(|e| AppError::Internal(format!("DB error in find_path_id: {}", e)))?;
if let Some(node) = node {
current_id = node.file_id;
} else {
return Ok(None);
}
}
Ok(Some(current_id))
}
pub async fn ensure_path(&self, path: &str) -> Result<i64> {
if let Some(id) = self.find_path_id(path).await? {
return Ok(id);
}
let parts: Vec<&str> = path
.trim_start_matches('/')
.trim_end_matches('/')
.split('/')
.filter(|s| !s.is_empty())
.collect();
let mut current_id: i64 = 0;
for part in parts {
let node = entity::Entity::find()
.filter(entity::Column::ParentId.eq(current_id))
.filter(entity::Column::Name.eq(part.to_string()))
.filter(entity::Column::IsDir.eq(true))
.one(&self.db)
.await
.map_err(|e| AppError::Internal(format!("DB error in ensure_path: {}", e)))?;
if let Some(node) = node {
current_id = node.file_id;
continue;
}
current_id = self.create_directory(current_id, part).await?;
}
Ok(current_id)
}
fn data_subdir_prefix(filename: &str) -> &str {
&filename[..2.min(filename.len())]
}
pub async fn get_data_file_dir_id(&self, filename: &str) -> Result<i64> {
let prefix = Self::data_subdir_prefix(filename);
let path = format!("{}/data/{}", self.repo_path, prefix);
self.ensure_path(&path).await
}
pub async fn get_type_dir_id(&self, file_type: ResticFileType) -> Result<i64> {
if file_type.is_config() {
self.ensure_path(&self.repo_path.clone()).await
} else {
let path = format!("{}/{}", self.repo_path, file_type.dirname());
self.ensure_path(&path).await
}
}
pub async fn upload_file(&self, parent_id: i64, filename: &str, data: Bytes) -> Result<i64> {
let file_size = data.len() as i64;
tracing::debug!(
"Uploading file '{}' ({} bytes) to parent {}",
filename,
file_size,
parent_id
);
let md5_hash = format!("{:x}", md5::compute(&data));
let upload_domain = self.get_upload_domain().await?;
let upload_url = format!("{}/upload/v2/file/single/create", upload_domain);
let data_vec = data.to_vec();
let api_response: ApiResponse<SingleUploadData> = retry_api!(self, |token| {
let form = Form::new()
.text("parentFileID", parent_id.to_string())
.text("filename", filename.to_string())
.text("etag", md5_hash.clone())
.text("size", file_size.to_string())
.text("duplicate", "2") .part(
"file",
Part::bytes(data_vec.clone()).file_name(filename.to_string()),
);
self.token_manager
.http_client()
.post(&upload_url)
.header("Authorization", format!("Bearer {}", token))
.header("Platform", "open_platform")
.multipart(form)
.send()
});
if !api_response.is_success() {
return Err(AppError::Pan123Api {
code: api_response.code,
message: api_response.message,
});
}
let upload_data = api_response
.data
.ok_or_else(|| AppError::Internal("No data in upload response".to_string()))?;
if !upload_data.completed {
return Err(AppError::Internal("Upload not completed".to_string()));
}
let file_id = upload_data.file_id;
entity::Entity::insert(entity::ActiveModel {
file_id: Set(file_id),
parent_id: Set(parent_id),
name: Set(filename.to_string()),
is_dir: Set(false),
size: Set(file_size),
etag: Set(Some(md5_hash.clone())),
updated_at: Set(chrono::Utc::now().naive_utc()),
})
.on_conflict(
sea_orm::sea_query::OnConflict::columns([
entity::Column::ParentId,
entity::Column::Name,
])
.update_columns([
entity::Column::FileId,
entity::Column::ParentId,
entity::Column::Name,
entity::Column::Size,
entity::Column::Etag,
entity::Column::UpdatedAt,
])
.to_owned(),
)
.exec(&self.db)
.await
.map_err(|e| AppError::Internal(format!("Failed to sync file to DB: {}", e)))?;
tracing::info!("Uploaded file '{}' with id {}", filename, file_id);
return Ok(file_id);
}
pub async fn get_download_url(&self, file_id: i64) -> Result<String> {
let url = format!("{}/api/v1/file/download_info?fileId={}", BASE_URL, file_id);
let response: ApiResponse<DownloadInfoData> = self.get(&url).await?;
if !response.is_success() {
if response.code == 5066 {
return Err(AppError::NotFound(format!("File {} not found", file_id)));
}
return Err(AppError::Pan123Api {
code: response.code,
message: response.message,
});
}
let data = response
.data
.ok_or_else(|| AppError::Internal("No data in download info response".to_string()))?;
Ok(data.download_url)
}
pub async fn download_file(&self, file_id: i64, range: Option<(u64, u64)>) -> Result<Bytes> {
let download_url = self.get_download_url(file_id).await?;
let mut request = self.token_manager.http_client().get(&download_url);
if let Some((start, end)) = range {
request = request.header("Range", format!("bytes={}-{}", start, end));
}
let response = request.send().await?;
if !response.status().is_success() && response.status().as_u16() != 206 {
return Err(AppError::Internal(format!(
"Download failed with status: {}",
response.status()
)));
}
Ok(response.bytes().await?)
}
pub async fn trash_file(&self, file_id: i64) -> Result<()> {
tracing::debug!("Moving file {} to trash", file_id);
let request = TrashRequest {
file_ids: vec![file_id],
};
let response: ApiResponse<()> = self
.post(&format!("{}/api/v1/file/trash", BASE_URL), &request)
.await?;
if !response.is_success() {
return Err(AppError::Pan123Api {
code: response.code,
message: response.message,
});
}
entity::Entity::delete_by_id(file_id)
.exec(&self.db)
.await
.map_err(|e| {
AppError::Internal(format!("Failed to delete trashed file from DB: {}", e))
})?;
Ok(())
}
pub async fn delete_file(&self, _parent_id: i64, file_id: i64) -> Result<()> {
self.trash_file(file_id).await?;
let url = format!("{}/api/v1/file/delete", BASE_URL);
let request = DeleteRequest {
file_ids: vec![file_id],
};
let response: ApiResponse<serde_json::Value> = self.post(&url, &request).await?;
if !response.is_success() {
return Err(AppError::Pan123Api {
code: response.code,
message: response.message,
});
}
entity::Entity::delete_by_id(file_id)
.exec(&self.db)
.await
.map_err(|e| AppError::Internal(format!("Failed to delete file from DB: {}", e)))?;
tracing::info!("Deleted file {} from persistent cache", file_id);
Ok(())
}
pub async fn move_files(&self, file_ids: Vec<i64>, to_parent_id: i64) -> Result<()> {
if file_ids.is_empty() {
return Ok(());
}
tracing::debug!("Moving {} files to parent {}", file_ids.len(), to_parent_id);
let request = MoveRequest {
file_ids: file_ids.clone(),
to_parent_file_id: to_parent_id,
};
let response: ApiResponse<()> = self
.post(&format!("{}/api/v1/file/move", BASE_URL), &request)
.await?;
if !response.is_success() {
return Err(AppError::Pan123Api {
code: response.code,
message: response.message,
});
}
entity::Entity::update_many()
.col_expr(entity::Column::ParentId, Expr::value(to_parent_id))
.filter(entity::Column::FileId.is_in(file_ids.clone()))
.exec(&self.db)
.await
.map_err(|e| {
AppError::Internal(format!("Failed to update moved files in DB: {}", e))
})?;
tracing::info!("Moved {} files to parent {}", file_ids.len(), to_parent_id);
Ok(())
}
pub async fn get_file_info(&self, parent_id: i64, filename: &str) -> Result<Option<FileInfo>> {
self.find_file(parent_id, filename).await
}
pub async fn init_repository(&self) -> Result<()> {
tracing::info!("Initializing repository at {}", self.repo_path);
self.ensure_path(&self.repo_path.clone()).await?;
for file_type in [
ResticFileType::Data,
ResticFileType::Keys,
ResticFileType::Locks,
ResticFileType::Snapshots,
ResticFileType::Index,
] {
let path = format!("{}/{}", self.repo_path, file_type.dirname());
self.ensure_path(&path).await?;
}
tracing::info!("Repository initialized successfully");
Ok(())
}
pub async fn warm_cache(&self, force_rebuild: bool) -> Result<()> {
let start = std::time::Instant::now();
let count = entity::Entity::find()
.count(&self.db)
.await
.map_err(|e| AppError::Internal(format!("Failed to count cache entries: {}", e)))?;
if !force_rebuild && count > 0 {
tracing::info!(
"Reusing existing cache with {} entries for repository: {}",
count,
self.repo_path
);
return Ok(());
}
tracing::info!(
"{} cache for repository: {}",
if force_rebuild {
"Rebuilding"
} else {
"Initializing"
},
self.repo_path
);
entity::Entity::delete_many()
.exec(&self.db)
.await
.map_err(|e| AppError::Internal(format!("DB clear failed: {}", e)))?;
let parts: Vec<&str> = self
.repo_path
.trim_start_matches('/')
.trim_end_matches('/')
.split('/')
.filter(|s| !s.is_empty())
.collect();
let mut current_id: i64 = 0;
for part in parts {
let files = self.fetch_files_from_api(current_id).await?;
let found = files
.into_iter()
.find(|f| f.filename == part && f.is_folder());
match found {
Some(found) => {
entity::Entity::insert(entity::ActiveModel {
file_id: Set(found.file_id),
parent_id: Set(current_id),
name: Set(found.filename.clone()),
is_dir: Set(true),
size: Set(0),
etag: Set(None),
updated_at: Set(chrono::Utc::now().naive_utc()),
})
.exec(&self.db)
.await
.map_err(|e| {
AppError::Internal(format!(
"Failed to insert path component into DB: {}",
e
))
})?;
current_id = found.file_id;
}
None => {
tracing::warn!(
"Path component {} not found during warm-up. Repository might not exist yet.",
part
);
return Ok(());
}
}
}
let mut queue = vec![(current_id, self.repo_path.clone())];
while let Some((parent_id, path)) = queue.pop() {
tracing::debug!("Crawling directory: {}", path);
let files = self.fetch_files_from_api(parent_id).await?;
if files.is_empty() {
continue;
}
let mut models = Vec::with_capacity(files.len());
for f in &files {
models.push(entity::ActiveModel {
file_id: Set(f.file_id),
parent_id: Set(parent_id),
name: Set(f.filename.clone()),
is_dir: Set(f.is_folder()),
size: Set(f.size),
etag: Set(None),
updated_at: Set(chrono::Utc::now().naive_utc()),
});
}
for chunk in models.chunks(50) {
entity::Entity::insert_many(chunk.to_vec())
.exec(&self.db)
.await
.map_err(|e| {
AppError::Internal(format!(
"Failed to batch insert files in warm_cache: {}",
e
))
})?;
}
for f in files {
if f.is_folder() {
queue.push((f.file_id, format!("{}/{}", path, f.filename)));
}
}
}
tracing::info!("Cache warm-up completed in {:?}", start.elapsed());
Ok(())
}
pub async fn list_all_data_files(&self) -> Result<Vec<FileInfo>> {
let Some(data_dir_id) = self
.find_path_id(&format!("{}/data", self.repo_path))
.await?
else {
return Ok(Vec::new());
};
let subdirs = entity::Entity::find()
.filter(entity::Column::ParentId.eq(data_dir_id))
.filter(entity::Column::IsDir.eq(true))
.all(&self.db)
.await
.map_err(|e| {
AppError::Internal(format!("DB error in list_all_data_files (subdirs): {}", e))
})?;
let subdir_ids: Vec<i64> = subdirs.into_iter().map(|n| n.file_id).collect();
let files = entity::Entity::find()
.filter(entity::Column::ParentId.is_in(subdir_ids))
.filter(entity::Column::IsDir.eq(false))
.all(&self.db)
.await
.map_err(|e| {
AppError::Internal(format!("DB error in list_all_data_files (files): {}", e))
})?;
Ok(files
.into_iter()
.map(|n| FileInfo {
file_id: n.file_id,
filename: n.name,
file_type: 0,
size: n.size,
parent_file_id: n.parent_id,
trashed: 0,
})
.collect())
}
}
impl std::fmt::Debug for Pan123Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pan123Client")
.field("token_manager", &self.token_manager)
.field("repo_path", &self.repo_path)
.finish()
}
}