use std::collections::HashMap;
use std::io::{self, Read};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use chrono::{DateTime, Utc};
use serde_json::Value;
use tokio::io::{AsyncRead, AsyncWriteExt};
use crate::FileSource;
use crate::error::{HonchoError, Result};
use crate::session::PeerSpec;
use crate::types::message::MessageSearchOptions;
use crate::types::session::{SessionConfiguration, SessionPeerConfig};
use super::runtime::block_on;
struct ErrorAwareReader {
inner: tokio::io::DuplexStream,
error_slot: Arc<Mutex<Option<io::Error>>>,
}
impl AsyncRead for ErrorAwareReader {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let this = self.get_mut();
match Pin::new(&mut this.inner).poll_read(cx, buf) {
Poll::Ready(Ok(())) => {
if buf.filled().is_empty()
&& let Some(err) = this.error_slot.lock().ok().and_then(|mut g| g.take())
{
return Poll::Ready(Err(err));
}
Poll::Ready(Ok(()))
}
other => other,
}
}
}
#[derive(Clone)]
pub struct Session {
inner: crate::Session,
}
impl std::fmt::Debug for Session {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Session")
.field("id", &self.inner.id())
.field("is_active", &self.inner.is_active())
.finish()
}
}
impl Session {
pub(crate) fn new(inner: crate::Session) -> Self {
Self { inner }
}
#[must_use]
pub fn id(&self) -> &str {
self.inner.id()
}
#[must_use]
pub fn is_active(&self) -> bool {
self.inner.is_active()
}
#[must_use]
pub fn metadata(&self) -> Option<HashMap<String, Value>> {
self.inner.metadata()
}
#[must_use]
pub fn configuration(&self) -> Option<SessionConfiguration> {
self.inner.configuration()
}
pub fn refresh(&self) -> Result<()> {
block_on(self.inner.refresh())
}
pub fn get_metadata(&self) -> Result<HashMap<String, Value>> {
block_on(self.inner.get_metadata())
}
pub fn set_metadata(&self, metadata: HashMap<String, Value>) -> Result<()> {
block_on(self.inner.set_metadata(metadata))
}
pub fn get_configuration(&self) -> Result<SessionConfiguration> {
block_on(self.inner.get_configuration())
}
pub fn set_configuration(&self, configuration: &SessionConfiguration) -> Result<()> {
block_on(self.inner.set_configuration(configuration))
}
pub fn get_configuration_raw(&self) -> Result<HashMap<String, Value>> {
block_on(self.inner.get_configuration_raw())
}
pub fn set_configuration_raw(&self, configuration: HashMap<String, Value>) -> Result<()> {
block_on(self.inner.set_configuration_raw(configuration))
}
pub fn add_peer(&self, id: impl Into<String>) -> Result<()> {
block_on(self.inner.add_peer(id))
}
pub fn add_peers(&self, specs: impl IntoIterator<Item = impl Into<PeerSpec>>) -> Result<()> {
block_on(self.inner.add_peers(specs))
}
pub fn set_peers(&self, specs: impl IntoIterator<Item = impl Into<PeerSpec>>) -> Result<()> {
block_on(self.inner.set_peers(specs))
}
pub fn remove_peers(&self, ids: impl IntoIterator<Item = impl Into<String>>) -> Result<()> {
block_on(self.inner.remove_peers(ids))
}
pub fn peers(&self) -> Result<Vec<super::Peer>> {
block_on(self.inner.peers()).map(|peers| peers.into_iter().map(super::Peer::new).collect())
}
pub fn get_peer_configuration(&self, peer_id: &str) -> Result<SessionPeerConfig> {
block_on(self.inner.get_peer_configuration(peer_id))
}
pub fn set_peer_configuration(&self, peer_id: &str, config: &SessionPeerConfig) -> Result<()> {
block_on(self.inner.set_peer_configuration(peer_id, config))
}
pub fn add_messages(
&self,
messages: Vec<crate::types::message::MessageCreate>,
) -> Result<Vec<crate::Message>> {
block_on(self.inner.add_messages(messages))
}
pub fn messages(&self) -> Result<Vec<crate::Message>> {
block_on(async {
let page = self.inner.messages().await?;
super::iter::collect_all_pages(page).await
})
}
pub fn delete(&self) -> Result<()> {
block_on(self.inner.delete())
}
pub fn clone_session(&self) -> Result<Session> {
block_on(self.inner.clone_session()).map(Session::new)
}
pub fn clone_session_with_message(&self, message_id: &str) -> Result<Session> {
block_on(self.inner.clone_session_with_message(message_id)).map(Session::new)
}
pub fn get_message(&self, id: &str) -> Result<crate::Message> {
block_on(self.inner.get_message(id))
}
pub fn update_message(
&self,
id: &str,
metadata: HashMap<String, Value>,
) -> Result<crate::Message> {
block_on(self.inner.update_message(id, metadata))
}
pub fn context(&self) -> Result<crate::types::session::SessionContext> {
block_on(self.inner.context())
}
pub fn context_with_options(
&self,
options: &crate::types::session::SessionContextOptions,
) -> Result<crate::types::session::SessionContext> {
block_on(self.inner.context_with_options(options))
}
pub fn summaries(&self) -> Result<crate::types::session::SessionSummaries> {
block_on(self.inner.summaries())
}
pub fn search(&self, query: &str) -> Result<Vec<crate::Message>> {
block_on(self.inner.search(query))
}
pub fn search_with_options(
&self,
options: &MessageSearchOptions,
) -> Result<Vec<crate::Message>> {
block_on(self.inner.search_with_options(options))
}
pub fn representation(&self, peer_id: &str) -> Result<String> {
block_on(self.inner.representation(peer_id))
}
pub fn queue_status(
&self,
observer_id: Option<&str>,
sender_id: Option<&str>,
) -> Result<crate::types::dream::QueueStatus> {
block_on(self.inner.queue_status(observer_id, sender_id))
}
#[must_use]
pub fn upload_file(&self, source: impl Into<FileSource>) -> BlockingUploadFileBuilder<'_> {
BlockingUploadFileBuilder {
inner: self.inner.upload_file(source),
reader_handle: None,
}
}
pub fn upload_file_streamed(
&self,
filename: impl Into<String>,
reader: impl Read + Send + 'static,
content_type: impl Into<String>,
) -> BlockingUploadFileBuilder<'_> {
let (mut tx, rx) = tokio::io::duplex(8192);
let filename_owned = filename.into();
let content_type_owned = content_type.into();
let error_slot: Arc<Mutex<Option<io::Error>>> = Arc::new(Mutex::new(None));
let slot_clone = error_slot.clone();
let handle = super::runtime::handle();
let reader_handle = tokio::task::spawn_blocking(move || {
let mut reader = reader;
handle.block_on(async {
let mut buf = [0u8; 8192];
loop {
match reader.read(&mut buf) {
Ok(0) => return,
Ok(n) => {
if tx.write_all(&buf[..n]).await.is_err() {
return;
}
}
Err(e) => {
let _ = slot_clone.lock().map(|mut g| *g = Some(e));
return;
}
}
}
});
});
let wrapped_rx = ErrorAwareReader {
inner: rx,
error_slot,
};
BlockingUploadFileBuilder {
inner: self.inner.upload_file(FileSource::stream(
filename_owned,
wrapped_rx,
content_type_owned,
)),
reader_handle: Some(reader_handle),
}
}
}
pub struct BlockingUploadFileBuilder<'a> {
inner: crate::UploadFileBuilder<'a>,
reader_handle: Option<tokio::task::JoinHandle<()>>,
}
impl std::fmt::Debug for BlockingUploadFileBuilder<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BlockingUploadFileBuilder")
.finish_non_exhaustive()
}
}
impl BlockingUploadFileBuilder<'_> {
#[must_use]
pub fn peer(self, id: impl Into<String>) -> Self {
Self {
inner: self.inner.peer(id),
reader_handle: self.reader_handle,
}
}
#[must_use]
pub fn metadata(self, value: Value) -> Self {
Self {
inner: self.inner.metadata(value),
reader_handle: self.reader_handle,
}
}
#[must_use]
pub fn configuration(self, value: Value) -> Self {
Self {
inner: self.inner.configuration(value),
reader_handle: self.reader_handle,
}
}
#[must_use]
pub fn created_at(self, dt: DateTime<Utc>) -> Self {
Self {
inner: self.inner.created_at(dt),
reader_handle: self.reader_handle,
}
}
pub fn send(self) -> Result<Vec<crate::Message>> {
let result = block_on(self.inner.send());
if let Some(handle) = self.reader_handle {
let join_result = block_on(handle);
if result.is_ok()
&& let Err(join_error) = join_result
{
return Err(HonchoError::Io(std::io::Error::other(
join_error.to_string(),
)));
}
}
result
}
}
#[must_use]
pub struct BlockingSessionRepresentationBuilder {
inner: super::super::session::SessionRepresentationBuilder,
}
impl BlockingSessionRepresentationBuilder {
pub fn target(mut self, target: impl Into<String>) -> Self {
self.inner = self.inner.target(target);
self
}
pub fn search_query(mut self, query: impl Into<String>) -> Self {
self.inner = self.inner.search_query(query);
self
}
pub fn search_top_k(mut self, k: u32) -> Self {
self.inner = self.inner.search_top_k(k);
self
}
pub fn search_max_distance(mut self, d: f64) -> Self {
self.inner = self.inner.search_max_distance(d);
self
}
pub fn include_most_frequent(mut self, v: bool) -> Self {
self.inner = self.inner.include_most_frequent(v);
self
}
pub fn max_conclusions(mut self, m: u32) -> Self {
self.inner = self.inner.max_conclusions(m);
self
}
pub fn send(self) -> Result<String> {
block_on(self.inner.send())
}
}
impl Session {
pub fn representation_builder(
&self,
peer_id: impl Into<String>,
) -> BlockingSessionRepresentationBuilder {
BlockingSessionRepresentationBuilder {
inner: self.inner.representation_builder(peer_id.into()),
}
}
}