use std::sync::Arc;
#[cfg(feature = "grpc")]
use anytype_rpc::client::default_grpc_endpoint;
#[cfg(feature = "grpc")]
use anytype_rpc::client::{AnytypeGrpcClient, AnytypeGrpcConfig};
#[cfg(feature = "grpc")]
use snafu::prelude::*;
#[cfg(feature = "grpc")]
use tokio::sync::Mutex;
use tracing::debug;
use crate::{
ANYTYPE_DESKTOP_URL, Result,
config::{
ANYTYPE_URL_ENV, DEFAULT_SERVICE_NAME, RATE_LIMIT_MAX_RETRIES_DEFAULT,
RATE_LIMIT_MAX_RETRIES_ENV,
},
http_client::HttpClient,
prelude::*,
};
#[derive(Debug, Clone)]
pub struct ClientConfig {
pub base_url: Option<String>,
pub app_name: String,
pub keystore: Option<String>,
pub keystore_service: Option<String>,
pub limits: ValidationLimits,
pub rate_limit_max_retries: u32,
pub disable_cache: bool,
pub verify: Option<VerifyConfig>,
#[cfg(feature = "grpc")]
pub grpc_endpoint: Option<String>,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
base_url: None,
app_name: DEFAULT_SERVICE_NAME.to_string(),
limits: ValidationLimits::default(),
rate_limit_max_retries: std::env::var(RATE_LIMIT_MAX_RETRIES_ENV)
.ok()
.and_then(|value| value.parse::<u32>().ok())
.unwrap_or(RATE_LIMIT_MAX_RETRIES_DEFAULT),
disable_cache: false,
verify: None,
keystore: None,
keystore_service: None,
#[cfg(feature = "grpc")]
grpc_endpoint: None,
}
}
}
impl ClientConfig {
#[must_use]
pub fn app_name(self, app_name: &str) -> Self {
Self {
app_name: app_name.to_string(),
..self
}
}
#[must_use]
pub fn limits(self, limits: ValidationLimits) -> Self {
Self { limits, ..self }
}
#[must_use]
pub fn disable_cache(self, disable_cache: bool) -> Self {
Self {
disable_cache,
..self
}
}
#[must_use]
pub fn ensure_available(self, verify: VerifyConfig) -> Self {
Self {
verify: Some(verify),
..self
}
}
#[must_use]
pub fn verify_config(self, verify: Option<VerifyConfig>) -> Self {
Self { verify, ..self }
}
#[cfg(feature = "grpc")]
#[must_use]
pub fn grpc_endpoint(mut self, endpoint: String) -> Self {
self.grpc_endpoint = Some(endpoint);
self
}
#[must_use]
pub fn get_limits(&self) -> &ValidationLimits {
&self.limits
}
#[must_use]
pub fn get_verify_config(&self) -> Option<&VerifyConfig> {
self.verify.as_ref()
}
}
#[derive(Clone)]
pub struct AnytypeClient {
pub(crate) client: Arc<HttpClient>,
pub(crate) config: ClientConfig,
pub(crate) keystore: KeyStore,
pub(crate) cache: Arc<AnytypeCache>,
#[cfg(feature = "grpc")]
pub(crate) grpc: Arc<Mutex<Option<AnytypeGrpcClient>>>,
}
impl std::fmt::Debug for AnytypeClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AnytypeClient")
.field("config", &self.config)
.field("keystore:service", &self.keystore.service().to_string())
.field("cache", &self.cache)
.finish_non_exhaustive()
}
}
impl AnytypeClient {
pub fn new(app_name: &str) -> Result<Self> {
Self::with_config(ClientConfig::default().app_name(app_name))
}
pub fn with_config(config: ClientConfig) -> Result<Self> {
let client = reqwest::Client::builder().no_proxy();
Self::with_client(client, config)
}
pub fn with_client(builder: reqwest::ClientBuilder, config: ClientConfig) -> Result<Self> {
let base_url = config.base_url.clone().unwrap_or_else(|| {
std::env::var(ANYTYPE_URL_ENV).unwrap_or_else(|_| ANYTYPE_DESKTOP_URL.to_string())
});
let keystore_service = config
.keystore_service
.unwrap_or_else(|| config.app_name.clone());
let keystore = KeyStore::new(&keystore_service, config.keystore.as_deref().unwrap_or(""))?;
#[cfg(feature = "grpc")]
let grpc_endpoint = config.grpc_endpoint.unwrap_or_else(default_grpc_endpoint);
let http_creds = keystore.get_http_credentials()?;
let http_client = HttpClient::new(
builder,
base_url.clone(),
config.limits.clone(),
config.rate_limit_max_retries,
http_creds,
)?;
let cache = if config.disable_cache {
AnytypeCache::new_disabled()
} else {
AnytypeCache::default()
};
debug!(
base_url,
keystore = &keystore.id(),
keystore_service,
grpc_endpoint,
"new http client"
);
Ok(Self {
client: Arc::new(http_client),
config: ClientConfig {
base_url: Some(base_url),
keystore_service: Some(keystore_service),
#[cfg(feature = "grpc")]
grpc_endpoint: Some(grpc_endpoint),
..config
},
keystore,
cache: Arc::new(cache),
#[cfg(feature = "grpc")]
grpc: Arc::new(Mutex::new(None)),
})
}
#[must_use]
pub fn get_config(&self) -> &ClientConfig {
&self.config
}
#[must_use]
pub fn get_http_endpoint(&self) -> &str {
&self.client.base_url
}
#[cfg(feature = "grpc")]
#[must_use]
pub fn get_grpc_endpoint(&self) -> Option<String> {
self.config.grpc_endpoint.clone()
}
#[must_use]
pub fn api_version(&self) -> String {
crate::ANYTYPE_API_VERSION.to_string()
}
#[cfg(feature = "grpc")]
pub async fn grpc_client(&self) -> Result<AnytypeGrpcClient> {
let guard = self.grpc.lock().await;
if let Some(client) = guard.as_ref() {
return Ok(client.clone());
}
drop(guard);
let grpc_config = self
.config
.grpc_endpoint
.as_ref()
.map_or_else(AnytypeGrpcConfig::default, |endpoint| {
AnytypeGrpcConfig::new(endpoint.to_owned())
});
self.create_grpc_client(&grpc_config).await?;
let guard = self.grpc.lock().await;
guard.as_ref().cloned().context(GrpcUnavailableSnafu {
message: "gRPC client was not created".to_string(),
})
}
pub async fn ping_http(&self) -> Result<()> {
let _ = self.spaces().limit(1).list().await?;
Ok(())
}
#[cfg(feature = "grpc")]
async fn create_grpc_client(&self, config: &AnytypeGrpcConfig) -> Result<()> {
let creds = self.keystore.get_grpc_credentials()?;
let client = if let Some(token) = creds.session_token() {
AnytypeGrpcClient::from_token(config, token.to_string())
.await
.context(GrpcSnafu)?
} else if let Some(account_key) = creds.account_key() {
AnytypeGrpcClient::from_account_key(config, account_key.to_string())
.await
.context(GrpcSnafu)?
} else {
return GrpcUnavailableSnafu {
message: "no grpc token or account key in keystore".to_string(),
}
.fail();
};
{
let mut guard = self.grpc.lock().await;
*guard = Some(client);
}
Ok(())
}
#[cfg(feature = "grpc")]
pub async fn ping_grpc(&self) -> Result<()> {
use anytype_rpc::{
anytype::rpc::account::local_link::list_apps::Request as ListAppsRequest,
auth::with_token,
};
use tonic::Request;
let grpc = self.grpc_client().await?;
let mut commands = grpc.client_commands();
let request = Request::new(ListAppsRequest {});
let request = with_token(request, grpc.token()).map_err(|err| AnytypeError::Auth {
message: err.to_string(),
})?;
let response = commands
.account_local_link_list_apps(request)
.await
.map_err(|status| AnytypeError::Other {
message: format!("gRPC request failed: {status}"),
})?
.into_inner();
if let Some(error) = response.error
&& error.code != 0
{
return Err(AnytypeError::Other {
message: format!(
"grpc list apps failed: {} (code {})",
error.description, error.code
),
});
}
Ok(())
}
#[must_use]
pub fn http_metrics(&self) -> HttpMetricsSnapshot {
self.client.metrics_snapshot()
}
pub fn enable_cache(&self) {
self.cache.enable();
}
pub fn disable_cache(&self) {
self.cache.disable();
}
pub fn cache_is_enabled(&self) {
self.cache.is_enabled();
}
}
impl AnytypeClient {
#[doc(hidden)]
#[must_use]
pub fn cache(&self) -> Arc<AnytypeCache> {
self.cache.clone()
}
}
#[cfg(feature = "grpc")]
pub async fn find_grpc(program: Option<impl Into<String>>) -> Option<u16> {
let prefix = program.map_or_else(|| "anytype".to_string(), Into::into);
let ports = match lsof_listen_ports(&prefix).await {
Ok(ports) => ports,
Err(err) => {
debug!("lsof failed: {err}");
return None;
}
};
for port in &ports {
if probe_grpc_port(*port).await {
return Some(*port);
}
}
None
}
#[cfg(feature = "grpc")]
async fn lsof_listen_ports(prefix: &str) -> std::result::Result<Vec<u16>, String> {
let output = tokio::process::Command::new("lsof")
.args(["-Pni"])
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::null())
.output()
.await
.map_err(|err| format!("failed to run lsof: {err}"))?;
let stdout = String::from_utf8_lossy(&output.stdout);
let mut ports = Vec::new();
for line in stdout.lines() {
let Some(command) = line.split_whitespace().next() else {
continue;
};
if !command.starts_with(prefix) {
continue;
}
if !line.contains("LISTEN") {
continue;
}
if let Some(port) = extract_port(line)
&& !ports.contains(&port)
{
ports.push(port);
}
}
Ok(ports)
}
#[cfg(feature = "grpc")]
fn extract_port(line: &str) -> Option<u16> {
let before_listen = line.split("(LISTEN)").next()?;
let colon_pos = before_listen.rfind(':')?;
let after_colon = before_listen[colon_pos + 1..].trim();
after_colon.parse().ok()
}
#[cfg(feature = "grpc")]
async fn probe_grpc_port(port: u16) -> bool {
use anytype_rpc::anytype::{
ClientCommandsClient, rpc::app::get_version::Request as AppGetVersionRequest,
};
use std::time::Duration;
use tonic::transport::Endpoint;
let endpoint = match Endpoint::from_shared(format!("http://127.0.0.1:{port}")) {
Ok(ep) => ep.connect_timeout(Duration::from_secs(2)),
Err(_) => return false,
};
let channel = match endpoint.connect().await {
Ok(ch) => ch,
Err(_) => return false,
};
let mut client = ClientCommandsClient::new(channel);
client
.app_get_version(tonic::Request::new(AppGetVersionRequest {}))
.await
.is_ok()
}
#[cfg(all(feature = "grpc", test))]
mod find_grpc_tests {
use super::*;
#[test]
fn extract_port_ipv4() {
let line = "anytype 12345 user 25u IPv4 0x1234 0t0 TCP 127.0.0.1:31010 (LISTEN)";
assert_eq!(extract_port(line), Some(31010));
}
#[test]
fn extract_port_wildcard() {
let line = "anytype 12345 user 25u IPv4 0x1234 0t0 TCP *:31010 (LISTEN)";
assert_eq!(extract_port(line), Some(31010));
}
#[test]
fn extract_port_ipv6() {
let line = "anytypeH 12345 user 26u IPv6 0x5678 0t0 TCP [::1]:31010 (LISTEN)";
assert_eq!(extract_port(line), Some(31010));
}
#[test]
fn extract_port_no_listen() {
let line =
"anytype 12345 user 25u IPv4 0x1234 0t0 TCP 127.0.0.1:31010 (ESTABLISHED)";
assert_eq!(extract_port(line), None);
}
#[tokio::test]
async fn lsof_listen_ports_filters_prefix() {
let ports = lsof_listen_ports("zzz_nonexistent_program_zzz")
.await
.unwrap();
assert!(ports.is_empty());
}
}