use std::collections::HashMap;
use std::future::Future;
use std::hash::Hash;
use std::net::SocketAddr;
use std::sync::{Arc, Weak};
use async_trait::async_trait;
use log::{debug, info, warn};
use crate::bytes::{ByteBuffer, ByteBufferMut, DynamicByteBuffer};
use crate::cache::SharedMap;
#[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
use crate::certificate::ObfuscationBufferContainer;
use crate::certificate::{ServerKeyPair, ServerSecret};
use crate::crypto::{PAYLOAD_CRYPTO_OVERHEAD, ServerCryptoTool, UserCryptoState, UserServerState, verify_transcript_with_key};
use crate::flow::decoy::{DecoyFactory, random_decoy_factory};
use crate::flow::probe::ProbeFactory;
use crate::flow::server::{RawReceivedPacket, ServerFlowManager};
use crate::flow::{FlowConfig, FlowControllerError};
use crate::session::SessionControllerError;
use crate::session::server::{IncomingPacket, OutgoingRouter, ServerSessionManager};
use crate::settings::{Settings, keys};
use crate::socket::error::ServerSocketError;
use crate::tailer::{IdentityType, PacketFlags, ReturnCode, ServerConnectionHandler, Tailer};
use crate::utils::random::jittered_chunk_size;
use crate::utils::socket::Socket;
use crate::utils::sync::{AsyncExecutor, Mutex, NotifyQueueReceiver, NotifyQueueSender, RwLock, assert_runtime, create_bounded_notify_queue, create_notify_queue};
use crate::utils::unix_timestamp_ms;
pub struct ServerFlowConfiguration<T: IdentityType + Clone, AE: AsyncExecutor> {
socket: Option<Socket>,
address: Option<SocketAddr>,
config: FlowConfig,
reader_count: usize,
decoy_factory: Option<DecoyFactory<T, AE>>,
probe_factory: Option<ProbeFactory<AE>>,
}
impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> ServerFlowConfiguration<T, AE> {
pub fn new(config: FlowConfig, socket: Socket) -> Self {
Self {
socket: Some(socket),
address: None,
config,
reader_count: 1,
decoy_factory: None,
probe_factory: None,
}
}
pub fn with_address(config: FlowConfig, address: SocketAddr) -> Self {
Self {
socket: None,
address: Some(address),
config,
reader_count: 1,
decoy_factory: None,
probe_factory: None,
}
}
pub fn with_reader_count(mut self, count: usize) -> Self {
self.reader_count = count.max(1);
self
}
pub fn with_decoy_factory(mut self, factory: DecoyFactory<T, AE>) -> Self {
self.decoy_factory = Some(factory);
self
}
pub fn with_decoy<DP: crate::flow::decoy::DecoyCommunicationMode<T, AE> + 'static>(mut self) -> Self {
self.decoy_factory = Some(crate::flow::decoy::decoy_factory::<T, AE, DP>());
self
}
pub fn with_probe_factory(mut self, factory: ProbeFactory<AE>) -> Self {
self.probe_factory = Some(factory);
self
}
pub fn with_probe<PM: crate::flow::probe::ActiveProbeHandler<AE> + Default + 'static>(mut self) -> Self {
self.probe_factory = Some(crate::flow::probe::probe_factory::<AE, PM>());
self
}
}
pub struct ListenerBuilder<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static, IG: ServerConnectionHandler<T>> {
settings: Option<Arc<Settings<AE>>>,
flow_configs: Vec<ServerFlowConfiguration<T, AE>>,
secret: ServerSecret<'static>,
identity_generator: IG,
default_decoy_factory: DecoyFactory<T, AE>,
default_probe_factory: Option<ProbeFactory<AE>>,
}
impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static, IG: ServerConnectionHandler<T> + 'static> ListenerBuilder<T, AE, IG> {
pub fn new(key_pair: ServerKeyPair, identity_generator: IG) -> Self {
Self {
settings: None,
flow_configs: Vec::new(),
secret: key_pair.into_server_secret(),
identity_generator,
default_decoy_factory: random_decoy_factory(),
default_probe_factory: None,
}
}
pub fn with_settings(mut self, settings: Arc<Settings<AE>>) -> Self {
self.settings = Some(settings);
self
}
pub fn with_decoy_factory(mut self, factory: DecoyFactory<T, AE>) -> Self {
self.default_decoy_factory = factory;
self
}
pub fn with_decoy<DP: crate::flow::decoy::DecoyCommunicationMode<T, AE> + 'static>(mut self) -> Self {
self.default_decoy_factory = crate::flow::decoy::decoy_factory::<T, AE, DP>();
self
}
pub fn with_probe_factory(mut self, factory: ProbeFactory<AE>) -> Self {
self.default_probe_factory = Some(factory);
self
}
pub fn with_probe<PM: crate::flow::probe::ActiveProbeHandler<AE> + Default + 'static>(mut self) -> Self {
self.default_probe_factory = Some(crate::flow::probe::probe_factory::<AE, PM>());
self
}
pub fn add_flow(mut self, config: ServerFlowConfiguration<T, AE>) -> Self {
self.flow_configs.push(config);
self
}
pub fn with_flows(mut self, configs: Vec<ServerFlowConfiguration<T, AE>>) -> Self {
self.flow_configs = configs;
self
}
#[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
pub async fn build(mut self) -> Result<Listener<T, AE, IG>, ServerSocketError> {
assert_runtime().map_err(ServerSocketError::UnsupportedRuntime)?;
if self.flow_configs.is_empty() {
return Err(ServerSocketError::NoFlows);
}
let settings = self.settings.take().unwrap_or_else(|| Arc::new(Settings::default()));
let users: SharedMap<T, UserServerState> = SharedMap::new();
let mut flows = Vec::with_capacity(self.flow_configs.len());
let tailer_wire_len = Tailer::<T>::encrypted_len_s2c();
let mut max_data_payload = usize::MAX;
let obfs_buffer = self.secret.obfuscation_buffer();
for flow_config in self.flow_configs.drain(..) {
flow_config.config.assert(settings.mtu()).map_err(ServerSocketError::FlowError)?;
max_data_payload = max_data_payload.min(flow_config.config.max_user_payload(settings.mtu(), PAYLOAD_CRYPTO_OVERHEAD, tailer_wire_len));
let socks: Vec<Arc<Socket>> = if let Some(socket) = flow_config.socket {
vec![Arc::new(socket)]
} else {
let address = flow_config.address.expect("ServerFlowConfiguration must have either socket or address");
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
if flow_config.reader_count > 1 {
Socket::bind_reuse_port(address, flow_config.reader_count)
.map_err(ServerSocketError::SocketError)?
.into_iter().map(Arc::new).collect()
} else {
vec![Arc::new(Socket::bind(address).await.map_err(ServerSocketError::SocketError)?)]
}
} else {
vec![Arc::new(Socket::bind(address).await.map_err(ServerSocketError::SocketError)?)]
}
}
};
let decoy_factory = flow_config.decoy_factory.unwrap_or_else(|| Arc::clone(&self.default_decoy_factory));
let probe_factory = flow_config.probe_factory.as_ref().or(self.default_probe_factory.as_ref());
let crypto_send = ServerCryptoTool::new(users.create_cache(), obfs_buffer);
let crypto_recv = ServerCryptoTool::new(users.create_cache(), obfs_buffer);
let flow = ServerFlowManager::new(flow_config.config, probe_factory, crypto_send, crypto_recv, settings.clone(), socks, decoy_factory).await;
flows.push(flow);
}
let max_data_payload = if max_data_payload == usize::MAX {
settings.mtu()
} else {
max_data_payload
};
info!("listener built: max_data_payload={}B (mtu={}B, {} flow(s))", max_data_payload, settings.mtu(), flows.len());
let (accept_tx, accept_rx) = create_notify_queue::<ClientHandle<T, AE>>();
let router = Arc::new(Router {
flows,
sessions: RwLock::new(HashMap::new()),
users: Mutex::new(users),
});
Ok(Listener {
router,
secret: self.secret,
identity_generator: self.identity_generator,
accept_tx,
accept_rx: Mutex::new(accept_rx),
max_data_payload,
settings,
})
}
#[cfg(any(feature = "full_software", feature = "full_hardware"))]
pub async fn build(mut self) -> Result<Listener<T, AE, IG>, ServerSocketError> {
assert_runtime().map_err(ServerSocketError::UnsupportedRuntime)?;
if self.flow_configs.is_empty() {
return Err(ServerSocketError::NoFlows);
}
let settings = self.settings.take().unwrap_or_else(|| Arc::new(Settings::default()));
let users: SharedMap<T, UserServerState> = SharedMap::new();
let mut flows = Vec::with_capacity(self.flow_configs.len());
let tailer_wire_len = Tailer::<T>::encrypted_len_s2c();
let mut max_data_payload = usize::MAX;
let secret_arc = Arc::new(self.secret);
for flow_config in self.flow_configs.drain(..) {
flow_config.config.assert(settings.mtu()).map_err(ServerSocketError::FlowError)?;
max_data_payload = max_data_payload.min(flow_config.config.max_user_payload(settings.mtu(), PAYLOAD_CRYPTO_OVERHEAD, tailer_wire_len));
let socks: Vec<Arc<Socket>> = match flow_config.socket {
Some(socket) => vec![Arc::new(socket)],
None => {
let address = flow_config.address.expect("ServerFlowConfiguration must have either socket or address");
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
if flow_config.reader_count > 1 {
Socket::bind_reuse_port(address, flow_config.reader_count)
.map_err(ServerSocketError::SocketError)?
.into_iter().map(Arc::new).collect()
} else {
vec![Arc::new(Socket::bind(address).await.map_err(ServerSocketError::SocketError)?)]
}
} else {
vec![Arc::new(Socket::bind(address).await.map_err(ServerSocketError::SocketError)?)]
}
}
}
};
let decoy_factory = flow_config.decoy_factory.unwrap_or_else(|| Arc::clone(&self.default_decoy_factory));
let probe_factory = flow_config.probe_factory.as_ref().or(self.default_probe_factory.as_ref());
let crypto_send = ServerCryptoTool::new(users.create_cache(), Arc::clone(&secret_arc));
let crypto_recv = ServerCryptoTool::new(users.create_cache(), Arc::clone(&secret_arc));
let flow = ServerFlowManager::new(flow_config.config, probe_factory, crypto_send, crypto_recv, settings.clone(), socks, decoy_factory).await;
flows.push(flow);
}
let max_data_payload = if max_data_payload == usize::MAX {
settings.mtu()
} else {
max_data_payload
};
info!("listener built: max_data_payload={}B (mtu={}B, {} flow(s))", max_data_payload, settings.mtu(), flows.len());
let (accept_tx, accept_rx) = create_notify_queue::<ClientHandle<T, AE>>();
let router = Arc::new(Router {
flows,
sessions: RwLock::new(HashMap::new()),
users: Mutex::new(users),
});
Ok(Listener {
router,
secret: secret_arc,
identity_generator: self.identity_generator,
accept_tx,
accept_rx: Mutex::new(accept_rx),
max_data_payload,
settings,
})
}
}
pub(crate) struct Router<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> {
flows: Vec<Arc<ServerFlowManager<T, AE>>>,
sessions: RwLock<HashMap<T, Arc<ServerSessionManager<T, AE>>>>,
users: Mutex<SharedMap<T, UserServerState>>,
}
impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> Router<T, AE> {
#[inline]
pub(crate) fn flow_count(&self) -> usize {
self.flows.len()
}
}
pub struct Listener<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static, IG: ServerConnectionHandler<T> + 'static> {
router: Arc<Router<T, AE>>,
#[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
secret: ServerSecret<'static>,
#[cfg(any(feature = "full_software", feature = "full_hardware"))]
secret: Arc<ServerSecret<'static>>,
identity_generator: IG,
accept_tx: NotifyQueueSender<ClientHandle<T, AE>>,
accept_rx: Mutex<NotifyQueueReceiver<ClientHandle<T, AE>>>,
max_data_payload: usize,
settings: Arc<Settings<AE>>,
}
impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static, IG: ServerConnectionHandler<T> + 'static> Listener<T, AE, IG> {
#[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
#[inline]
fn make_initial_crypto_state(&self, initial_key: &impl ByteBuffer) -> UserCryptoState {
UserCryptoState::new(initial_key, self.secret.obfuscation_buffer())
}
#[cfg(any(feature = "full_software", feature = "full_hardware"))]
#[inline]
fn make_initial_crypto_state(&self, initial_key: &impl ByteBuffer) -> UserCryptoState {
UserCryptoState::new(initial_key)
}
#[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
#[inline]
fn upgrade_user_crypto(&self, user_state: &mut UserServerState, session_key: &impl ByteBuffer) {
user_state.upgrade_crypto(session_key, self.secret.obfuscation_buffer());
}
#[cfg(any(feature = "full_software", feature = "full_hardware"))]
#[inline]
fn upgrade_user_crypto(&self, user_state: &mut UserServerState, session_key: &impl ByteBuffer) {
user_state.upgrade_crypto(session_key);
}
pub fn start(self: &Arc<Self>) -> impl Future<Output = ()> {
let drain_capacity = self.settings.get(&keys::DRAIN_CHANNEL_CAPACITY) as usize;
for (index, flow) in self.router.flows.iter().enumerate() {
let (drain_tx, mut drain_rx) = create_bounded_notify_queue(drain_capacity);
let drain_tx = Arc::new(drain_tx);
for (sock_index, sock) in flow.recv_socks().iter().enumerate() {
let drain_tx = Arc::clone(&drain_tx);
let sock = Arc::clone(sock);
let flow_drain = Arc::clone(flow);
let settings_drain = Arc::clone(&self.settings);
self.settings.executor().spawn(async move {
loop {
let recv_buf = settings_drain.pool().allocate_for_recv();
match flow_drain.receive_raw(recv_buf, &sock).await {
Ok(raw_packet) => drain_tx.push(raw_packet),
Err(err) => {
warn!("flow manager {index} socket {sock_index}: receive error: {err}");
break;
}
}
}
});
}
drop(drain_tx);
let listener = Arc::clone(self);
self.settings.executor().spawn(async move {
while let Some(raw_packet) = drain_rx.recv().await {
listener.route_incoming(raw_packet, index).await;
}
});
}
async {}
}
async fn route_incoming(self: &Arc<Self>, raw_packet: RawReceivedPacket<T>, flow_index: usize) {
let identity = raw_packet.tailer.identity();
if raw_packet.tailer.flags().contains(PacketFlags::HANDSHAKE) {
self.handle_new_client(raw_packet, flow_index).await;
return;
}
let session = {
let sessions = self.router.sessions.read().await;
sessions.get(&identity).cloned()
};
if let Some(session) = session {
self.router.flows[flow_index].ensure_user(identity.clone(), session.counter()).await;
session.note_active_flow(flow_index);
let incoming = IncomingPacket {
body: raw_packet.body,
tailer: raw_packet.tailer,
};
if let Err(err) = session.process_incoming(incoming).await {
debug!("session processing error for {}: {}", identity.to_string(), err);
if matches!(err, SessionControllerError::ConnectionTerminated(_)) {
self.router.remove_session(&identity).await;
}
}
} else {
debug!("packet from unknown identity {}, dropping", identity.to_string());
}
}
async fn handle_new_client(self: &Arc<Self>, mut raw_packet: RawReceivedPacket<T>, flow_index: usize) {
let handshake_transcript = raw_packet.handshake_transcript.take();
let original_wire_packet = raw_packet.original_wire_packet.take();
let source_addr = raw_packet.source_addr;
let Some((server_data, initial_key, client_initial_data)) = self.secret.decapsulate_handshake_server(raw_packet.body, self.settings.pool()) else {
if let Some(packet) = original_wire_packet {
debug!("handshake decapsulation failed from {source_addr} (body too short for crypto header), forwarding to probe handler");
self.router.flows[flow_index].forward_to_probe(packet, source_addr).await;
} else {
debug!("handshake decapsulation failed from {source_addr} and original wire packet unavailable, dropping");
}
return;
};
let verified = matches!((&handshake_transcript, &original_wire_packet), (Some(transcript), Some(_)) if verify_transcript_with_key(&initial_key, transcript).is_ok());
if !verified {
if let Some(packet) = original_wire_packet {
debug!("handshake tailer verification failed from {source_addr}, forwarding to probe handler");
self.router.flows[flow_index].forward_to_probe(packet, source_addr).await;
} else {
debug!("handshake packet from {source_addr} missing deferred transcript or wire packet, dropping");
}
return;
}
let client_version_identity = raw_packet.tailer.identity();
let handshake_pn = raw_packet.tailer.packet_number();
if !self.identity_generator.verify_version(client_version_identity.to_bytes()) {
{
let mut users = self.router.users.lock().await;
let crypto_state = self.make_initial_crypto_state(&initial_key);
users.insert(client_version_identity.clone(), UserServerState::new(crypto_state)).await;
}
self.router.flows[flow_index].register_user_binding(client_version_identity.clone(), raw_packet.source_addr, handshake_pn).await;
let pn = ((unix_timestamp_ms() / 1000) as u64) << 32;
let buf = self.settings.pool().allocate(Some(T::length()));
let tailer = Tailer::termination(buf, &client_version_identity, ReturnCode::VersionMismatch, pn);
if let Err(err) = self.router.flows[flow_index].send_packet(tailer.into_buffer(), false, false).await {
warn!("failed to send version mismatch rejection: {err}");
}
{
let mut users = self.router.users.lock().await;
users.remove(&client_version_identity).await;
}
self.router.flows[flow_index].remove_user(&client_version_identity).await;
return;
}
let identity = self.identity_generator.generate(client_initial_data.slice());
let server_initial_data = self.identity_generator.initial_data(&identity);
let (incoming_tx, incoming_rx) = create_notify_queue::<DynamicByteBuffer>();
let router_weak: Weak<dyn OutgoingRouter<T>> = Arc::downgrade(&self.router) as Weak<dyn OutgoingRouter<T>>;
let (response_body, session_key) = self.secret.encapsulate_handshake_server(server_data, self.settings.pool(), server_initial_data.slice(), &initial_key);
let (session, response_packet, replacing) = {
let mut users = self.router.users.lock().await;
let replacing = users.contains_key(&identity);
if replacing {
debug!("re-handshake for {}: replacing existing session (last wins)", identity.to_string());
users.remove(&identity).await;
}
let initial_crypto_state = self.make_initial_crypto_state(&initial_key);
let result = ServerSessionManager::assemble_session(initial_crypto_state, response_body, raw_packet.tailer, identity.clone(), &mut users, incoming_tx, router_weak, self.router.flow_count(), self.settings.clone()).await;
match result {
Ok((session, response_packet)) => (session, response_packet, replacing),
Err(err) => {
warn!("handshake failed: {err}");
return;
}
}
};
if replacing {
self.router.sessions.write().await.remove(&identity);
for flow in &self.router.flows {
flow.remove_user(&identity).await;
}
}
self.router.flows[flow_index].register_user_binding(identity.clone(), raw_packet.source_addr, handshake_pn).await;
self.router.flows[flow_index].register_user(identity.clone(), session.counter()).await;
if let Err(err) = self.router.flows[flow_index].send_packet(response_packet, false, false).await {
warn!("failed to send handshake response: {err}");
self.router.users.lock().await.remove(&identity).await;
for flow in &self.router.flows {
flow.remove_user(&identity).await;
}
return;
}
{
let mut users = self.router.users.lock().await;
users
.modify(&identity, |user_state| {
self.upgrade_user_crypto(user_state, &session_key);
})
.await;
}
session.note_active_flow(flow_index);
{
let mut sessions = self.router.sessions.write().await;
if sessions.contains_key(&identity) {
debug!("concurrent handshake for {}: last wins, displacing earlier session", identity.to_string());
}
sessions.insert(identity.clone(), Arc::clone(&session));
}
let client_handle = ClientHandle {
session,
identity: identity.clone(),
incoming_rx: Mutex::new(incoming_rx),
max_data_payload: self.max_data_payload,
settings: self.settings.clone(),
router: Arc::clone(&self.router),
};
self.accept_tx.push(client_handle);
info!("new client connected: {}", identity.to_string());
}
pub async fn accept(&self) -> Result<ClientHandle<T, AE>, ServerSocketError> {
self.accept_rx.lock().await.recv().await.ok_or(ServerSocketError::ListenerStopped)
}
}
#[async_trait]
impl<T: IdentityType + Clone + Eq + Hash + Send + Sync + ToString + 'static, AE: AsyncExecutor + 'static> OutgoingRouter<T> for Router<T, AE> {
async fn route_packet(&self, packet: DynamicByteBuffer, identity: &T) -> bool {
let session = {
let sessions = self.sessions.read().await;
sessions.get(identity).cloned()
};
let Some(session) = session else {
return false;
};
let flow_idx = session.select_active_flow(self.flows.len());
if flow_idx < self.flows.len() {
self.flows[flow_idx].send_packet(packet, false, false).await.is_ok()
} else {
false
}
}
async fn remove_session(&self, identity: &T) {
if self.sessions.write().await.remove(identity).is_none() {
return;
}
self.users.lock().await.remove(identity).await;
for flow in &self.flows {
flow.remove_user(identity).await;
}
info!("client session removed: {}", identity.to_string());
}
}
pub struct ClientHandle<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> {
session: Arc<ServerSessionManager<T, AE>>,
identity: T,
incoming_rx: Mutex<NotifyQueueReceiver<DynamicByteBuffer>>,
max_data_payload: usize,
settings: Arc<Settings<AE>>,
router: Arc<Router<T, AE>>,
}
impl<T: IdentityType + Clone + Eq + Hash + Send + ToString, AE: AsyncExecutor> ClientHandle<T, AE> {
pub async fn send(&self, packet: DynamicByteBuffer) -> Result<(), ServerSocketError> {
let wire = self.session.prepare_outgoing(packet, false).await.map_err(ServerSocketError::SessionError)?;
if !self.router.route_packet(wire, &self.identity).await {
return Err(ServerSocketError::SessionError(SessionControllerError::FlowError(FlowControllerError::UserNotFound {
identity: self.identity.to_string(),
})));
}
Ok(())
}
pub async fn send_bytes(&self, data: &[u8]) -> Result<(), ServerSocketError> {
let jitter = self.settings.get(&keys::SEND_BYTES_JITTER);
let chunk = self.settings.get(&keys::SEND_BYTES_CHUNK) as usize;
let mut offset = 0;
while offset < data.len() {
let remaining = data.len() - offset;
let chunk_size = if remaining <= self.max_data_payload {
remaining
} else {
jittered_chunk_size(self.max_data_payload, chunk, jitter)
};
let buffer = self.settings.pool().allocate(Some(chunk_size));
buffer.slice_mut().copy_from_slice(&data[offset..offset + chunk_size]);
self.send(buffer).await?;
offset += chunk_size;
}
Ok(())
}
pub fn max_data_payload(&self) -> usize {
self.max_data_payload
}
pub async fn receive(&self) -> Result<DynamicByteBuffer, ServerSocketError> {
let buf = self.incoming_rx.lock().await.recv().await.ok_or(ServerSocketError::ChannelClosed)?;
Ok(buf)
}
pub async fn receive_bytes(&self) -> Result<Vec<u8>, ServerSocketError> {
let buffer = self.receive().await?;
Ok(buffer.slice().to_vec())
}
}
impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> Drop for ClientHandle<T, AE> {
fn drop(&mut self) {
let executor = self.settings.executor().clone();
let pn = (unix_timestamp_ms() / 1000) as u64 * (1u64 << 32);
let buf = self.settings.pool().allocate(Some(Tailer::<T>::len()));
let termination = Tailer::termination(buf, &self.identity, ReturnCode::Success, pn).into_buffer();
executor.block_on(async {
self.router.route_packet(termination, &self.identity).await;
self.router.remove_session(&self.identity).await;
});
}
}