use crate::common::model::{
NodeDispatchEnvelope, NodeErrorEnvelope, RequestDispatchEnvelope, ResponseDispatchEnvelope,
TaskDispatchEnvelope,
};
use crate::queue::QueuedItem;
use crate::utils::logger::LogModel;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::mpsc::{Receiver, Sender, channel};
pub struct Channel {
pub task_sender: Sender<QueuedItem<TaskDispatchEnvelope>>,
pub task_receiver: Arc<Mutex<Receiver<QueuedItem<TaskDispatchEnvelope>>>>,
pub request_sender: Sender<QueuedItem<RequestDispatchEnvelope>>,
pub request_receiver: Arc<Mutex<Receiver<QueuedItem<RequestDispatchEnvelope>>>>,
pub download_request_sender: Sender<QueuedItem<RequestDispatchEnvelope>>,
pub download_request_receiver: Arc<Mutex<Receiver<QueuedItem<RequestDispatchEnvelope>>>>,
pub response_sender: Sender<QueuedItem<ResponseDispatchEnvelope>>,
pub response_receiver: Arc<Mutex<Receiver<QueuedItem<ResponseDispatchEnvelope>>>>,
pub error_sender: Sender<QueuedItem<NodeErrorEnvelope>>,
pub error_receiver: Arc<Mutex<Receiver<QueuedItem<NodeErrorEnvelope>>>>,
pub log_sender: Sender<QueuedItem<LogModel>>,
pub log_receiver: Arc<Mutex<Receiver<QueuedItem<LogModel>>>>,
pub parser_task_sender: Sender<QueuedItem<NodeDispatchEnvelope>>,
pub parser_task_receiver: Arc<Mutex<Receiver<QueuedItem<NodeDispatchEnvelope>>>>,
pub remote_response_sender: Sender<QueuedItem<ResponseDispatchEnvelope>>,
pub remote_response_receiver: Arc<Mutex<Receiver<QueuedItem<ResponseDispatchEnvelope>>>>,
pub remote_parser_task_sender: Sender<QueuedItem<NodeDispatchEnvelope>>,
pub remote_parser_task_receiver: Arc<Mutex<Receiver<QueuedItem<NodeDispatchEnvelope>>>>,
pub remote_error_sender: Sender<QueuedItem<NodeErrorEnvelope>>,
pub remote_error_receiver: Arc<Mutex<Receiver<QueuedItem<NodeErrorEnvelope>>>>,
pub remote_task_sender: Sender<QueuedItem<TaskDispatchEnvelope>>,
pub remote_task_receiver: Arc<Mutex<Receiver<QueuedItem<TaskDispatchEnvelope>>>>,
}
impl Clone for Channel {
fn clone(&self) -> Self {
Channel {
task_sender: self.task_sender.clone(),
task_receiver: self.task_receiver.clone(),
request_sender: self.request_sender.clone(),
request_receiver: self.request_receiver.clone(),
download_request_sender: self.download_request_sender.clone(),
download_request_receiver: self.download_request_receiver.clone(),
response_sender: self.response_sender.clone(),
response_receiver: self.response_receiver.clone(),
error_sender: self.error_sender.clone(),
error_receiver: self.error_receiver.clone(),
log_sender: self.log_sender.clone(),
log_receiver: self.log_receiver.clone(),
parser_task_sender: self.parser_task_sender.clone(),
parser_task_receiver: self.parser_task_receiver.clone(),
remote_response_sender: self.remote_response_sender.clone(),
remote_response_receiver: self.remote_response_receiver.clone(),
remote_parser_task_sender: self.remote_parser_task_sender.clone(),
remote_parser_task_receiver: self.remote_parser_task_receiver.clone(),
remote_error_sender: self.remote_error_sender.clone(),
remote_error_receiver: self.remote_error_receiver.clone(),
remote_task_sender: self.remote_task_sender.clone(),
remote_task_receiver: self.remote_task_receiver.clone(),
}
}
}
impl Channel {
pub fn new(capacity: usize) -> Self {
let effective_capacity = if capacity < 10000 { 10000 } else { capacity };
let (task_sender, task_receiver) = channel(effective_capacity);
let (request_sender, request_receiver) = channel(effective_capacity);
let (download_request_sender, download_request_receiver) = channel(effective_capacity);
let (response_sender, response_receiver) = channel(effective_capacity);
let (error_sender, error_receiver) = channel(effective_capacity);
let (log_sender, log_receiver) = channel(10000);
let (parser_task_sender, parser_task_receiver) = channel(effective_capacity);
let (remote_response_sender, remote_response_receiver) = channel(effective_capacity);
let (remote_parser_task_sender, remote_parser_task_receiver) = channel(effective_capacity);
let (remote_error_sender, remote_error_receiver) = channel(effective_capacity);
let (remote_task_sender, remote_task_receiver) = channel(effective_capacity);
Channel {
task_sender,
task_receiver: Arc::new(Mutex::new(task_receiver)),
request_sender,
request_receiver: Arc::new(Mutex::new(request_receiver)),
download_request_sender,
download_request_receiver: Arc::new(Mutex::new(download_request_receiver)),
response_sender,
response_receiver: Arc::new(Mutex::new(response_receiver)),
error_sender,
error_receiver: Arc::new(Mutex::new(error_receiver)),
log_sender,
log_receiver: Arc::new(Mutex::new(log_receiver)),
parser_task_sender,
parser_task_receiver: Arc::new(Mutex::new(parser_task_receiver)),
remote_response_sender,
remote_response_receiver: Arc::new(Mutex::new(remote_response_receiver)),
remote_parser_task_sender,
remote_parser_task_receiver: Arc::new(Mutex::new(remote_parser_task_receiver)),
remote_error_sender,
remote_error_receiver: Arc::new(Mutex::new(remote_error_receiver)),
remote_task_sender,
remote_task_receiver: Arc::new(Mutex::new(remote_task_receiver)),
}
}
}