#![cfg_attr(test, allow(unused_imports, unused_variables, dead_code))]
pub mod client;
pub mod config;
pub mod controller;
pub mod service;
pub mod model;
pub mod error;
pub mod tool;
pub mod grpc;
use crate::controller::*;
use actix_files::Files;
use actix_web::dev::ServerHandle;
use actix_web::web::Data;
use actix_web::{middleware, middleware::Logger, web, App, HttpServer};
use config::anttp_config::AntTpConfig;
use log::info;
use once_cell::sync::Lazy;
use std::{env, io};
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use actix_web::http::Method;
use async_job::Runner;
use evmlib::Network::{ArbitrumOne, ArbitrumSepoliaTest};
use evmlib::wallet::Wallet;
use foyer::{BlockEngineConfig, Compression, DeviceBuilder, FsDeviceBuilder, HybridCache, HybridCacheBuilder, HybridCachePolicy, LfuConfig, PsyncIoEngineConfig, RecoverMode};
use indexmap::IndexMap;
use mockall_double::double;
use rmcp_actix_web::transport::{StreamableHttpService};
use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
#[cfg(not(grpc_disabled))]
use tokio::sync::oneshot;
use tokio::sync::Mutex;
#[cfg(not(grpc_disabled))]
use tonic::transport::Server;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
#[double]
use crate::client::CachingClient;
#[double]
use crate::client::ChunkCachingClient;
#[double]
use crate::client::PublicDataCachingClient;
#[double]
use crate::client::TArchiveCachingClient;
#[double]
use crate::client::StreamingClient;
#[double]
use crate::client::ArchiveCachingClient;
use crate::client::client_harness::ClientHarness;
use client::command::executor::Executor;
use crate::client::command::command_details::CommandDetails;
use crate::service::crypto_service::{CryptoService, Crypto, CryptoContent};
use crate::service::chunk_service::{Chunk, ChunkService};
use crate::service::command_service::CommandService;
#[double]
use crate::service::file_service::FileService;
use crate::service::archive_service::{ArchiveService, ArchiveForm, PublicArchiveForm, Upload, ArchiveResponse};
use crate::model::archive::ArchiveType;
use crate::model::resolve::Resolve;
use crate::service::tarchive_service::TarchiveService;
use crate::service::public_data_service::PublicDataService;
#[double]
use crate::service::resolver_service::ResolverService;
use crate::tool::McpTool;
#[cfg(not(grpc_disabled))]
use crate::grpc::archive_handler::{ArchiveHandler, ArchiveServiceServer};
#[cfg(not(grpc_disabled))]
use crate::grpc::chunk_handler::{ChunkHandler, ChunkServiceServer};
#[cfg(not(grpc_disabled))]
use crate::grpc::command_handler::{CommandHandler, CommandServiceServer};
#[cfg(not(grpc_disabled))]
use crate::grpc::public_data_handler::{PublicDataHandler, PublicServiceServer};
#[cfg(not(grpc_disabled))]
use crate::grpc::tarchive_handler::{TarchiveHandler, TarchiveServiceServer};
#[cfg(not(grpc_disabled))]
use crate::grpc::resolver_handler::{ResolverHandler, ResolverServiceServer};
#[cfg(not(grpc_disabled))]
use crate::grpc::crypto_handler::{CryptoHandler, CryptoServiceServer};
static ACTIX_SERVER_HANDLE: Lazy<Mutex<Option<ServerHandle>>> = Lazy::new(|| Mutex::new(None));
#[cfg(not(grpc_disabled))]
static TONIC_SERVER_SHUTDOWN_TX: Lazy<Mutex<Option<oneshot::Sender<()>>>> = Lazy::new(|| Mutex::new(None));
const API_BASE: &'static str = "/anttp-0/";
#[cfg(not(test))]
pub async fn run_server(ant_tp_config: AntTpConfig) -> io::Result<()> {
#[derive(OpenApi)]
#[openapi(
paths(
chunk_controller::get_chunk,
chunk_controller::get_chunk_binary,
chunk_controller::post_chunk,
chunk_controller::post_chunk_binary,
/*pointer_controller::get_pointer,
pointer_controller::post_pointer,
pointer_controller::put_pointer,*/
archive_controller::get_archive,
archive_controller::get_archive_root,
archive_controller::put_archive,
archive_controller::put_archive_root,
archive_controller::delete_archive,
archive_controller::push_archive,
/*public_archive_controller::get_public_archive,
public_archive_controller::get_public_archive_root,
public_archive_controller::post_public_archive,
public_archive_controller::put_public_archive,
public_archive_controller::delete_public_archive,
public_archive_controller::push_public_archive,*/
tarchive_controller::get_tarchive,
tarchive_controller::get_tarchive_root,
tarchive_controller::post_tarchive,
tarchive_controller::put_tarchive,
tarchive_controller::delete_tarchive,
tarchive_controller::push_tarchive,
/*public_scratchpad_controller::get_public_scratchpad,
public_scratchpad_controller::post_public_scratchpad,
public_scratchpad_controller::put_public_scratchpad,*/
/*register_controller::get_register,
register_controller::get_register_history,
register_controller::post_register,
register_controller::put_register,*/
/*private_scratchpad_controller::get_private_scratchpad,
private_scratchpad_controller::post_private_scratchpad,
private_scratchpad_controller::put_private_scratchpad,*/
/*graph_controller::get_graph_entry,
graph_controller::post_graph_entry,*/
public_data_controller::get_public_data,
public_data_controller::post_public_data,
public_data_controller::push_public_data,
command_controller::get_commands,
/*pnr_controller::get_pnr,
pnr_controller::post_mutable_pnr,
pnr_controller::post_immutable_pnr,
pnr_controller::put_pnr,
pnr_controller::put_pnr_record,
pnr_controller::patch_pnr,*/
/*key_value_controller::post_key_value,
key_value_controller::post_key_value_binary,
key_value_controller::get_key_value,
key_value_controller::get_key_value_binary,*/
resolver_controller::resolve,
crypto_controller::post_verify,
crypto_controller::post_sign,
/*crypto_controller::post_encrypt,
crypto_controller::post_decrypt*/
),
components(
schemas(PublicArchiveForm, ArchiveForm, Upload, ArchiveResponse, Chunk, ArchiveType, Resolve, Crypto, CryptoContent)
)
)]
struct ApiDoc;
let listen_address = ant_tp_config.listen_address.clone();
let https_listen_address = ant_tp_config.https_listen_address.clone();
#[cfg(not(grpc_disabled))]
let grpc_listen_address = ant_tp_config.grpc_listen_address.clone();
let wallet_private_key = ant_tp_config.wallet_private_key.clone();
let evm_network = match ant_tp_config.evm_network.to_lowercase().as_str() {
"arbitrumsepoliatest" => ArbitrumSepoliaTest,
_ => ArbitrumOne
};
let client_harness_data = Data::new(Mutex::new(ClientHarness::new(evm_network.clone(), ant_tp_config.clone())));
let evm_wallet_data = if !wallet_private_key.is_empty() {
Data::new(Wallet::new_from_private_key(evm_network, wallet_private_key.as_str())
.expect("Failed to instantiate EvmWallet."))
} else {
Data::new(Wallet::new_with_random_wallet(evm_network))
};
let hybrid_cache_data: Data<HybridCache<String, Vec<u8>>> = Data::new(build_foyer_cache(&ant_tp_config).await);
let command_status_data = Data::new(Mutex::new(IndexMap::<u128, CommandDetails>::with_capacity(ant_tp_config.command_buffer_size * 2)));
let command_executor = Executor::start(ant_tp_config.command_buffer_size, command_status_data.clone()).await;
let command_executor_data = Data::new(command_executor.clone());
let caching_client = CachingClient::new(client_harness_data, ant_tp_config.clone(), hybrid_cache_data.clone(), command_executor_data.clone());
let caching_client_data = Data::new(caching_client.clone());
let chunk_caching_client = ChunkCachingClient::new(caching_client.clone());
let streaming_client = StreamingClient::new(chunk_caching_client.clone(), ant_tp_config.clone());
let streaming_client_data = Data::new(streaming_client.clone());
let archive_caching_client = ArchiveCachingClient::new(caching_client.clone(), streaming_client.clone());
let tarchive_caching_client = TArchiveCachingClient::new(caching_client.clone(), streaming_client.clone());
let public_data_caching_client = PublicDataCachingClient::new(caching_client.clone(), streaming_client.clone());
let resolver_service_data = Data::new(
ResolverService::new(archive_caching_client.clone(), ant_tp_config.cached_mutable_ttl)
);
Runner::new().add(Box::new(caching_client_data.get_ref().clone())).run().await;
let file_service = FileService::new(chunk_caching_client.clone(), ant_tp_config.download_threads);
let tarchive_service_data = Data::new(TarchiveService::new(
PublicDataService::new(public_data_caching_client.clone(), resolver_service_data.get_ref().clone()),
tarchive_caching_client.clone(),
file_service.clone(),
resolver_service_data.get_ref().clone(),
ant_tp_config.clone()
));
let command_service_data = Data::new(CommandService::new(command_status_data.clone()));
let chunk_service_data = Data::new(ChunkService::new(chunk_caching_client.clone(), resolver_service_data.get_ref().clone()));
let public_data_service_data = Data::new(PublicDataService::new(public_data_caching_client.clone(), resolver_service_data.get_ref().clone()));
let archive_service_data = Data::new(ArchiveService::new(
tarchive_service_data.get_ref().clone(),
resolver_service_data.get_ref().clone(),
archive_caching_client.clone(),
file_service.clone()
));
let crypto_service_data = Data::new(CryptoService::new(ant_tp_config.clone()));
let mcp_tool = McpTool::new(
command_service_data.clone(),
chunk_service_data.clone(),
public_data_service_data.clone(),
archive_service_data.clone(),
tarchive_service_data.clone(),
resolver_service_data.clone(),
crypto_service_data.clone(),
evm_wallet_data.clone()
);
let mcp_tool_service = StreamableHttpService::builder()
.service_factory(Arc::new(move || { Ok(mcp_tool.clone()) }))
.session_manager(Arc::new(LocalSessionManager::default())) .stateful_mode(true) .sse_keep_alive(Duration::from_secs(30)) .build();
#[cfg(not(grpc_disabled))]
if !ant_tp_config.grpc_disabled && !ant_tp_config.uploads_disabled {
let chunk_handler = ChunkHandler::new(chunk_service_data.clone(), evm_wallet_data.clone());
let command_handler = CommandHandler::new(command_service_data.clone());
let public_data_handler = PublicDataHandler::new(public_data_service_data.clone(), evm_wallet_data.clone());
let archive_handler = ArchiveHandler::new(archive_service_data.clone(), evm_wallet_data.clone());
let tarchive_handler = TarchiveHandler::new(tarchive_service_data.clone(), public_data_service_data.clone(), evm_wallet_data.clone());
let resolver_handler = ResolverHandler::new(resolver_service_data.clone());
let crypto_handler = CryptoHandler::new(crypto_service_data.clone());
let (tx, rx) = oneshot::channel::<()>();
{
let mut guard = TONIC_SERVER_SHUTDOWN_TX.lock().await;
*guard = Some(tx);
}
info!("Starting Tonic (gRPC) listener on port {}", grpc_listen_address);
tokio::task::spawn(async move {
let result = Server::builder()
.add_service(ChunkServiceServer::new(chunk_handler))
.add_service(CommandServiceServer::new(command_handler))
.add_service(PublicServiceServer::new(public_data_handler))
.add_service(ArchiveServiceServer::new(archive_handler))
.add_service(TarchiveServiceServer::new(tarchive_handler))
.add_service(ResolverServiceServer::new(resolver_handler))
.add_service(CryptoServiceServer::new(crypto_handler))
.serve_with_shutdown(grpc_listen_address, async {
rx.await.ok();
})
.await;
if let Err(e) = result {
log::error!("gRPC server error: {}", e);
}
});
} else {
#[cfg(not(grpc_disabled))]
info!("Tonic (gRPC) listener disabled");
}
#[cfg(grpc_disabled)]
{
info!("Tonic (gRPC) listener disabled (not built)");
}
let actix_config = ant_tp_config.clone();
let actix_server = HttpServer::new(move || {
let logger = Logger::default();
let mut app = App::new()
.wrap(logger)
.wrap(middleware::Compress::default()) .service(
SwaggerUi::new("/swagger-ui/{_:.*}")
.url("/api-docs/openapi.json", ApiDoc::openapi()),
)
.route(
"",
web::method(Method::CONNECT).to(connect_controller::forward)
)
.route(
format!("{}chunk/{{address}}", API_BASE).as_str(),
web::get().to(chunk_controller::get_chunk),
)
.route(
format!("{}binary/chunk/{{address}}", API_BASE).as_str(),
web::get().to(chunk_controller::get_chunk_binary),
)
.route(
format!("{}binary/public_data/{{address}}", API_BASE).as_str(),
web::get().to(public_data_controller::get_public_data)
)
.route(
format!("{}command", API_BASE).as_str(),
web::get().to(command_controller::get_commands)
)
.route(
format!("{}archive/{{address}}", API_BASE).as_str(),
web::get().to(archive_controller::get_archive_root),
)
.route(
format!("{}archive/{{address}}/{{path:.*}}", API_BASE).as_str(),
web::get().to(archive_controller::get_archive),
)
.route(
format!("{}tarchive/{{address}}", API_BASE).as_str(),
web::get().to(tarchive_controller::get_tarchive_root),
)
.route(
format!("{}tarchive/{{address}}/{{path:.*}}", API_BASE).as_str(),
web::get().to(tarchive_controller::get_tarchive),
)
.route(
format!("{}resolve/{{name}}", API_BASE).as_str(),
web::get().to(resolver_controller::resolve)
)
.route(
format!("{}crypto/verify/{{public_key}}", API_BASE).as_str(),
web::post().to(crypto_controller::post_verify)
)
.route(
format!("{}crypto/sign", API_BASE).as_str(),
web::post().to(crypto_controller::post_sign)
)
.route(
"/{path:.*}",
web::get().to(file_controller::get_public_data),
)
.route(
"/{path:.*}",
web::head().to(file_controller::head_public_data),
)
.app_data(Data::new(actix_config.clone()))
.app_data(caching_client_data.clone())
.app_data(streaming_client_data.clone())
.app_data(evm_wallet_data.clone())
.app_data(hybrid_cache_data.clone())
.app_data(command_status_data.clone())
.app_data(command_service_data.clone())
.app_data(chunk_service_data.clone())
.app_data(crypto_service_data.clone())
.app_data(tarchive_service_data.clone())
.app_data(archive_service_data.clone())
.app_data(public_data_service_data.clone())
.app_data(resolver_service_data.clone())
.app_data(web::PayloadConfig::new(1024 * 1024 * 10));
if !actix_config.uploads_disabled {
if !actix_config.mcp_tools_disabled {
app = app
.service(
web::scope("/mcp-0")
.service(mcp_tool_service.clone().scope())
);
}
app = app
.route(
format!("{}chunk", API_BASE).as_str(),
web::post().to(chunk_controller::post_chunk),
)
.route(
format!("{}binary/chunk", API_BASE).as_str(),
web::post().to(chunk_controller::post_chunk_binary),
)
.route(
format!("{}multipart/archive/{{address}}", API_BASE).as_str(),
web::put().to(archive_controller::put_archive_root),
)
.route(
format!("{}multipart/archive/{{address}}/{{path:.*}}", API_BASE).as_str(),
web::put().to(archive_controller::put_archive),
)
.route(
format!("{}archive/{{address}}/{{path:.*}}", API_BASE).as_str(),
web::delete().to(archive_controller::delete_archive),
)
.route(
format!("{}archive/{{address}}", API_BASE).as_str(),
web::post().to(archive_controller::push_archive),
)
.route(
format!("{}multipart/tarchive", API_BASE).as_str(),
web::post().to(tarchive_controller::post_tarchive_root),
)
.route(
format!("{}multipart/tarchive/{{path:.*}}", API_BASE).as_str(),
web::post().to(tarchive_controller::post_tarchive),
)
.route(
format!("{}multipart/tarchive/{{address}}", API_BASE).as_str(),
web::put().to(tarchive_controller::put_tarchive_root),
)
.route(
format!("{}multipart/tarchive/{{address}}/{{path:.*}}", API_BASE).as_str(),
web::put().to(tarchive_controller::put_tarchive),
)
.route(
format!("{}tarchive/{{address}}/{{path:.*}}", API_BASE).as_str(),
web::delete().to(tarchive_controller::delete_tarchive),
)
.route(
format!("{}public_data/{{address}}", API_BASE).as_str(),
web::post().to(public_data_controller::push_public_data),
)
.route(
format!("{}tarchive/{{address}}", API_BASE).as_str(),
web::post().to(tarchive_controller::push_tarchive),
)
.route(
format!("{}binary/public_data", API_BASE).as_str(),
web::post().to(public_data_controller::post_public_data)
)
};
if actix_config.static_file_directory != "" {
app.service(Files::new(
"/static",
actix_config.static_file_directory.clone(),
))
} else {
app
}
})
.bind(listen_address)?
.run();
let mut guard = ACTIX_SERVER_HANDLE.lock().await;
*guard = Some(actix_server.handle());
info!("Starting Actix (HTTP) listener");
actix_server.await
}
async fn build_foyer_cache(app_config: &AntTpConfig) -> HybridCache<String, Vec<u8>> {
let cache_dir = if app_config.map_cache_directory.is_empty() {
env::temp_dir().to_str().unwrap().to_owned() + "/anttp/cache/"
} else {
app_config.map_cache_directory.clone()
};
let memory_cache_size = if app_config.immutable_memory_cache_size > 0 { app_config.immutable_memory_cache_size } else { 1 };
let builder = HybridCacheBuilder::new()
.with_name("anttp-hybrid-cache")
.with_flush_on_close(true)
.with_policy(HybridCachePolicy::WriteOnInsertion)
.memory(memory_cache_size)
.with_shards(app_config.download_threads)
.with_eviction_config(LfuConfig::default())
.storage();
if app_config.immutable_disk_cache_size > 0 {
let device = FsDeviceBuilder::new(Path::new(cache_dir.as_str()))
.with_capacity(app_config.immutable_disk_cache_size * 1024 * 1024)
.build().expect("Failed to build FsDevice");
builder
.with_io_engine_config(PsyncIoEngineConfig::new())
.with_engine_config(BlockEngineConfig::new(device))
.with_recover_mode(RecoverMode::Quiet)
.with_compression(Compression::None) .with_spawner(tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(4)
.max_blocking_threads(8)
.build()
.unwrap()
.into()
)
.build().await.expect("Failed to build hybrid in-memory/on-disk cache")
} else {
builder.build().await.expect("Failed to build in-memory cache")
}
}
pub async fn stop_server() -> Result<(), String> {
let actix_handle_opt = {
let mut guard = ACTIX_SERVER_HANDLE.lock().await;
guard.take()
};
#[cfg(not(grpc_disabled))]
{
let mut guard = TONIC_SERVER_SHUTDOWN_TX.lock().await;
if let Some(tx) = guard.take() {
info!("Stopping gRPC server...");
let _ = tx.send(());
}
}
if let Some(handle) = actix_handle_opt {
info!("Stopping Actix server gracefully...");
handle.stop(true).await;
info!("Actix server stopped");
Ok(())
} else {
Err("Actix server handle not found or already stopped".to_string())
}
}