use std::collections::{HashMap, VecDeque};
use fe2o3_amqp_types::definitions::{Fields, Handle, TransferNumber};
use serde_amqp::primitives::Symbol;
use slab::Slab;
use tokio::sync::mpsc;
use crate::{
connection::{AllocSessionError, ConnectionHandle},
control::SessionControl,
endpoint::OutgoingChannel,
session::{engine::SessionEngine, SessionState},
util::Constant,
Session,
};
use super::{error::BeginError, SessionHandle, DEFAULT_WINDOW};
pub(crate) const DEFAULT_SESSION_CONTROL_BUFFER_SIZE: usize = 128;
pub(crate) const DEFAULT_SESSION_MUX_BUFFER_SIZE: usize = u16::MAX as usize;
#[derive(Debug, Clone)]
pub struct Builder {
pub next_outgoing_id: TransferNumber,
pub incoming_window: TransferNumber,
pub outgoing_window: TransferNumber,
pub handle_max: Handle,
pub offered_capabilities: Option<Vec<Symbol>>,
pub desired_capabilities: Option<Vec<Symbol>>,
pub properties: Option<Fields>,
pub buffer_size: usize,
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "transaction", feature = "acceptor"))]
pub(crate) control_link_acceptor: Option<ControlLinkAcceptor>,
}
impl Default for Builder {
fn default() -> Self {
Self {
next_outgoing_id: 0,
incoming_window: DEFAULT_WINDOW,
outgoing_window: DEFAULT_WINDOW,
handle_max: Default::default(),
offered_capabilities: None,
desired_capabilities: None,
properties: None,
buffer_size: DEFAULT_SESSION_MUX_BUFFER_SIZE,
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "transaction", feature = "acceptor"))]
control_link_acceptor: None,
}
}
}
cfg_transaction! {
cfg_acceptor! {
use crate::transaction::{
coordinator::ControlLinkAcceptor, manager::TransactionManager, session::TxnSession,
};
impl Builder {
pub(crate) fn into_txn_session(
self,
control: mpsc::Sender<SessionControl>,
outgoing: mpsc::Sender<crate::link::LinkFrame>,
outgoing_channel: OutgoingChannel,
control_link_acceptor: ControlLinkAcceptor,
local_state: SessionState,
) -> TxnSession<Session> {
let txn_manager = TransactionManager::new(outgoing, control_link_acceptor);
let session = Session {
outgoing_channel,
local_state,
initial_outgoing_id: Constant::new(self.next_outgoing_id),
next_outgoing_id: self.next_outgoing_id,
incoming_window: self.incoming_window,
outgoing_window: self.outgoing_window,
handle_max: self.handle_max,
incoming_channel: None,
next_incoming_id: 0,
remote_incoming_window: 0,
remote_incoming_window_exhausted_buffer: VecDeque::new(),
remote_outgoing_window: 0,
offered_capabilities: self.offered_capabilities,
desired_capabilities: self.desired_capabilities,
properties: self.properties,
link_name_by_output_handle: Slab::new(),
link_by_name: HashMap::new(),
link_by_input_handle: HashMap::new(),
delivery_tag_by_id: HashMap::new(),
};
TxnSession {
control,
session,
txn_manager,
}
}
}
}
}
impl Builder {
pub fn new() -> Self {
Self::default()
}
pub(crate) fn into_session(
self,
outgoing_channel: OutgoingChannel,
local_state: SessionState,
) -> Session {
Session {
outgoing_channel,
local_state,
initial_outgoing_id: Constant::new(self.next_outgoing_id),
next_outgoing_id: self.next_outgoing_id,
incoming_window: self.incoming_window,
outgoing_window: self.outgoing_window,
handle_max: self.handle_max,
incoming_channel: None,
next_incoming_id: 0,
remote_incoming_window: 0,
remote_incoming_window_exhausted_buffer: VecDeque::new(),
remote_outgoing_window: 0,
offered_capabilities: self.offered_capabilities,
desired_capabilities: self.desired_capabilities,
properties: self.properties,
link_name_by_output_handle: Slab::new(),
link_by_name: HashMap::new(),
link_by_input_handle: HashMap::new(),
delivery_tag_by_id: HashMap::new(),
}
}
pub fn next_outgoing_id(mut self, value: TransferNumber) -> Self {
self.next_outgoing_id = value;
self
}
pub fn incoming_window(mut self, value: TransferNumber) -> Self {
self.incoming_window = value;
self
}
pub fn outgoing_widnow(mut self, value: TransferNumber) -> Self {
self.outgoing_window = value;
self
}
pub fn handle_max(mut self, value: impl Into<Handle>) -> Self {
self.handle_max = value.into();
self
}
pub fn add_offered_capabilities(mut self, capability: impl Into<Symbol>) -> Self {
match &mut self.offered_capabilities {
Some(capabilities) => capabilities.push(capability.into()),
None => self.offered_capabilities = Some(vec![capability.into()]),
}
self
}
pub fn set_offered_capabilities(mut self, capabilities: Vec<Symbol>) -> Self {
self.offered_capabilities = Some(capabilities);
self
}
pub fn add_desired_capabilities(mut self, capability: impl Into<Symbol>) -> Self {
match &mut self.desired_capabilities {
Some(capabilities) => capabilities.push(capability.into()),
None => self.desired_capabilities = Some(vec![capability.into()]),
}
self
}
pub fn set_desired_capabilities(mut self, capabilities: Vec<Symbol>) -> Self {
self.desired_capabilities = Some(capabilities);
self
}
pub fn properties(mut self, properties: Fields) -> Self {
self.properties = Some(properties);
self
}
pub fn buffer_size(mut self, buffer_size: usize) -> Self {
self.buffer_size = buffer_size;
self
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn begin(
self,
connection: &mut ConnectionHandle<()>,
) -> Result<SessionHandle<()>, BeginError> {
let local_state = SessionState::Unmapped;
let (session_control_tx, session_control_rx) =
mpsc::channel::<SessionControl>(DEFAULT_SESSION_CONTROL_BUFFER_SIZE);
let (incoming_tx, incoming_rx) = mpsc::channel(self.buffer_size);
let (outgoing_tx, outgoing_rx) = mpsc::channel(self.buffer_size);
let outgoing_channel = match connection.allocate_session(incoming_tx).await {
Ok(channel) => channel,
Err(alloc_error) => match alloc_error {
AllocSessionError::IllegalState => return Err(BeginError::IllegalConnectionState),
AllocSessionError::ChannelMaxReached => {
return Err(BeginError::LocalChannelMaxReached);
}
},
};
#[cfg(not(all(feature = "transaction", feature = "acceptor")))]
let engine_handle = {
let session = self.into_session(outgoing_channel, local_state);
let engine = SessionEngine::begin_client_session(
connection.control.clone(),
session,
session_control_rx,
incoming_rx,
connection.outgoing.clone(),
outgoing_rx,
)
.await?;
engine.spawn()
};
#[cfg(all(feature = "transaction", feature = "acceptor"))]
let engine_handle = {
let mut this = self;
match this.control_link_acceptor.take() {
Some(control_link_acceptor) => {
let session = this.into_txn_session(
session_control_tx.clone(),
outgoing_tx.clone(),
outgoing_channel,
control_link_acceptor,
local_state,
);
let engine = SessionEngine::begin_client_session(
connection.control.clone(),
session,
session_control_rx,
incoming_rx,
connection.outgoing.clone(),
outgoing_rx,
)
.await?;
engine.spawn()
}
None => {
let session = this.into_session(outgoing_channel, local_state);
let engine = SessionEngine::begin_client_session(
connection.control.clone(),
session,
session_control_rx,
incoming_rx,
connection.outgoing.clone(),
outgoing_rx,
)
.await?;
engine.spawn()
}
}
};
let handle = SessionHandle {
is_ended: false,
control: session_control_tx,
engine_handle,
outgoing: outgoing_tx,
link_listener: (),
};
Ok(handle)
}
#[cfg(target_arch = "wasm32")]
pub async fn begin_on_local_set(
self,
connection: &mut ConnectionHandle<()>,
local_set: &tokio::task::LocalSet,
) -> Result<SessionHandle<()>, BeginError> {
let local_state = SessionState::Unmapped;
let (session_control_tx, session_control_rx) =
mpsc::channel::<SessionControl>(DEFAULT_SESSION_CONTROL_BUFFER_SIZE);
let (incoming_tx, incoming_rx) = mpsc::channel(self.buffer_size);
let (outgoing_tx, outgoing_rx) = mpsc::channel(self.buffer_size);
let outgoing_channel = match connection.allocate_session(incoming_tx).await {
Ok(channel) => channel,
Err(alloc_error) => match alloc_error {
AllocSessionError::IllegalState => return Err(BeginError::IllegalConnectionState),
AllocSessionError::ChannelMaxReached => {
return Err(BeginError::LocalChannelMaxReached);
}
},
};
let engine_handle = {
let session = self.into_session(outgoing_channel, local_state);
let engine = SessionEngine::begin_client_session(
connection.control.clone(),
session,
session_control_rx,
incoming_rx,
connection.outgoing.clone(),
outgoing_rx,
)
.await?;
engine.spawn_local(local_set)
};
let handle = SessionHandle {
is_ended: false,
control: session_control_tx,
engine_handle,
outgoing: outgoing_tx,
link_listener: (),
};
Ok(handle)
}
}