use std::path::Path;
use std::time::Duration;
use interprocess::local_socket::tokio::prelude::LocalSocketStream;
use interprocess::local_socket::traits::tokio::Stream as _;
use interprocess::local_socket::{GenericFilePath, Name, ToFsName as _};
use serde::de::DeserializeOwned;
use serde_json::Value;
use crate::errors::{KodeBridgeError, Result};
use crate::http_client::RequestBuilder;
use crate::stream_client::{send_streaming_request, StreamingResponse};
use http::Method;
use std::str::FromStr as _;
use tracing::{debug, trace};
#[derive(Debug, Clone)]
pub struct StreamClientConfig {
pub default_timeout: Duration,
pub max_retries: usize,
pub retry_delay: Duration,
pub buffer_size: usize,
}
impl Default for StreamClientConfig {
fn default() -> Self {
Self {
default_timeout: Duration::from_secs(60),
max_retries: 3,
retry_delay: Duration::from_millis(100),
buffer_size: 8192,
}
}
}
pub struct IpcStreamClient {
name: Name<'static>,
config: StreamClientConfig,
}
pub struct StreamRequestBuilder<'a> {
client: &'a IpcStreamClient,
method: Method,
path: String,
body: Option<Value>,
timeout: Duration,
headers: Vec<(String, String)>,
}
pub struct StreamResponse {
inner: StreamingResponse,
}
impl StreamResponse {
const fn new(response: StreamingResponse) -> Self {
Self { inner: response }
}
pub const fn status(&self) -> u16 {
self.inner.status_code()
}
pub fn is_success(&self) -> bool {
self.inner.is_success()
}
pub fn is_client_error(&self) -> bool {
self.inner.is_client_error()
}
pub fn is_server_error(&self) -> bool {
self.inner.is_server_error()
}
pub async fn json_results<T>(self) -> Result<Vec<T>>
where
T: DeserializeOwned + Send,
{
self.inner.json(Duration::from_secs(30)).await
}
pub async fn json<T>(self, timeout: Duration) -> Result<Vec<T>>
where
T: DeserializeOwned + Send,
{
self.inner.json(timeout).await
}
pub async fn process_lines<F>(self, timeout: Duration, mut handler: F) -> Result<()>
where
F: FnMut(&str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> + Send,
{
self.inner
.process_lines_with_timeout(timeout, |line| {
handler(line).map(|_| true) })
.await
}
pub async fn process_json<F, T>(self, timeout: Duration, handler: F) -> Result<Vec<T>>
where
F: FnMut(&str) -> Option<T> + Send,
T: Send + 'static,
{
self.inner.process_json(timeout, handler).await
}
pub async fn collect_text(self) -> Result<String> {
self.inner.collect_text().await
}
pub async fn collect_text_with_timeout(self, timeout: Duration) -> Result<String> {
self.inner.collect_text_with_timeout(timeout).await
}
pub fn into_inner(self) -> StreamingResponse {
self.inner
}
pub fn headers(&self) -> Value {
self.inner.headers_json()
}
}
impl IpcStreamClient {
pub fn new<P>(path: P) -> Result<Self>
where
P: AsRef<Path>,
{
Self::with_config(path, StreamClientConfig::default())
}
pub fn with_config<P>(path: P, config: StreamClientConfig) -> Result<Self>
where
P: AsRef<Path>,
{
let name = path
.as_ref()
.to_fs_name::<GenericFilePath>()
.map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
.into_owned();
Ok(Self { name, config })
}
async fn create_connection(&self) -> Result<LocalSocketStream> {
let mut last_error = None;
for attempt in 0..self.config.max_retries {
if attempt > 0 {
tokio::time::sleep(self.config.retry_delay).await;
}
match LocalSocketStream::connect(self.name.clone()).await {
Ok(stream) => {
debug!("Created streaming connection on attempt {}", attempt + 1);
return Ok(stream);
}
Err(e) => {
trace!("Streaming connection attempt {} failed: {}", attempt + 1, e);
last_error = Some(e);
}
}
}
Err(KodeBridgeError::connection(format!(
"Failed to create streaming connection after {} attempts: {}",
self.config.max_retries,
last_error
.map(|e| e.to_string())
.unwrap_or_else(|| "Unknown error".to_string())
)))
}
async fn send_request_internal(
&self,
method: &str,
path: &str,
body: Option<&Value>,
headers: &[(String, String)],
timeout: Duration,
) -> Result<StreamingResponse> {
let method =
Method::from_str(method).map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
let mut builder = RequestBuilder::new(method, path.to_string());
for (key, value) in headers {
builder = builder.header(key, value);
}
if let Some(json_body) = body {
builder = builder.json(json_body)?;
}
let request = builder.build()?;
let result = tokio::time::timeout(timeout, async {
let stream = self.create_connection().await?;
send_streaming_request(stream, request).await
})
.await;
match result {
Ok(response) => response,
Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
}
}
pub fn get(&self, path: &str) -> StreamRequestBuilder<'_> {
StreamRequestBuilder::new(self, Method::GET, path)
}
pub fn post(&self, path: &str) -> StreamRequestBuilder<'_> {
StreamRequestBuilder::new(self, Method::POST, path)
}
pub fn put(&self, path: &str) -> StreamRequestBuilder<'_> {
StreamRequestBuilder::new(self, Method::PUT, path)
}
pub fn delete(&self, path: &str) -> StreamRequestBuilder<'_> {
StreamRequestBuilder::new(self, Method::DELETE, path)
}
pub fn patch(&self, path: &str) -> StreamRequestBuilder<'_> {
StreamRequestBuilder::new(self, Method::PATCH, path)
}
pub fn head(&self, path: &str) -> StreamRequestBuilder<'_> {
StreamRequestBuilder::new(self, Method::HEAD, path)
}
pub fn options(&self, path: &str) -> StreamRequestBuilder<'_> {
StreamRequestBuilder::new(self, Method::OPTIONS, path)
}
}
impl<'a> StreamRequestBuilder<'a> {
fn new(client: &'a IpcStreamClient, method: Method, path: &str) -> Self {
Self {
client,
method,
path: path.to_string(),
body: None,
timeout: client.config.default_timeout,
headers: Vec::new(),
}
}
pub fn json_body(mut self, body: &Value) -> Self {
self.body = Some(body.clone());
self
}
pub const fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn header<K, V>(mut self, key: K, value: V) -> Self
where
K: Into<String>,
V: Into<String>,
{
self.headers.push((key.into(), value.into()));
self
}
pub async fn send(self) -> Result<StreamResponse> {
let response = self
.client
.send_request_internal(
self.method.as_str(),
&self.path,
self.body.as_ref(),
&self.headers,
self.timeout,
)
.await?;
Ok(StreamResponse::new(response))
}
pub async fn json_results<T>(self) -> Result<Vec<T>>
where
T: DeserializeOwned + Send,
{
let response = self.send().await?;
response.json_results().await
}
pub async fn process_lines<F>(self, handler: F) -> Result<()>
where
F: FnMut(&str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> + Send,
{
let timeout = self.timeout;
let response = self.send().await?;
response.process_lines(timeout, handler).await
}
}