use anyhow::{Error, Result};
use env_logger::Env;
use futures_util::{StreamExt, TryStreamExt, future::join_all, stream};
use std::{mem, time::Duration};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
task::JoinHandle,
time::{Instant, sleep_until, timeout},
};
use tonari_actor::{Actor, AsyncActor, BareContext, Context, Recipient, System};
enum CrawlerMessage {
Crawl { hosts: Vec<String> },
Finish,
}
struct Crawler {
sorter: Recipient<SorterCollectorMessage>,
}
impl AsyncActor for Crawler {
type Error = Error;
type Message = CrawlerMessage;
async fn handle(
&mut self,
_context: &BareContext<CrawlerMessage>,
message: CrawlerMessage,
) -> Result<()> {
match message {
CrawlerMessage::Crawl { hosts } => {
let stream = stream::iter(hosts);
let limit = None;
stream
.map(Ok)
.try_for_each_concurrent(limit, |host| crawl_host(host, self.sorter.clone()))
.await?;
},
CrawlerMessage::Finish => {
log::debug!("Crawler finished, propagating to Sorter...");
self.sorter.send(SorterCollectorMessage::Finish)?;
},
}
Ok(())
}
}
async fn crawl_host(host: String, sorter: Recipient<SorterCollectorMessage>) -> Result<()> {
log::debug!("Connecting to {host}...");
let mut stream = TcpStream::connect(format!("{}:80", host)).await?;
log::debug!("Connected to {host}");
stream.write_all(b"GET / HTTP/1.0\r\n\r\n").await?;
log::debug!("HTTP request sent to {host}");
for i in 0..5 {
let mut buffer = vec![0u8; 100];
let timeout_result = timeout(Duration::from_secs(3), stream.read(&mut buffer)).await;
let Ok(read_result) = timeout_result else {
sorter.send(SorterCollectorMessage::Chunk(Chunk {
host: host.clone(),
text: format!("<timed out reading chunk {i}>"),
}))?;
continue;
};
let read_bytes = read_result?;
if read_bytes == 0 {
break;
}
let text = String::from_utf8_lossy(&buffer[..read_bytes]).into_owned();
let chunk = Chunk { host: host.clone(), text };
sorter.send(SorterCollectorMessage::Chunk(chunk))?;
}
log::debug!("Closing connection to {host}.");
Ok(())
}
#[derive(Debug)]
struct Chunk {
host: String,
text: String,
}
enum SorterCollectorMessage {
Chunk(Chunk),
Finish,
}
struct Sorter {
collector: Recipient<SorterCollectorMessage>,
pending_tasks: Vec<JoinHandle<Result<(), Error>>>,
first_message_reception: Option<Instant>,
}
impl Sorter {
fn new(collector: Recipient<SorterCollectorMessage>) -> Self {
Self { collector, pending_tasks: vec![], first_message_reception: None }
}
}
impl AsyncActor for Sorter {
type Error = Error;
type Message = SorterCollectorMessage;
const DEFAULT_CAPACITY_NORMAL: usize = 50;
async fn handle(
&mut self,
_context: &BareContext<SorterCollectorMessage>,
message: SorterCollectorMessage,
) -> Result<()> {
match message {
SorterCollectorMessage::Chunk(chunk) => {
let first_message_reception =
*self.first_message_reception.get_or_insert(Instant::now());
let collector = self.collector.clone();
let task = tokio::spawn(async move {
let first_char = chunk.text.chars().next();
let first_char_value = first_char.map_or(0, u64::from);
let delay = Duration::from_millis(first_char_value * 20);
sleep_until(first_message_reception + delay).await;
collector.send(SorterCollectorMessage::Chunk(chunk))?;
Ok(())
});
self.pending_tasks.push(task);
},
SorterCollectorMessage::Finish => {
let initial_task_count = self.pending_tasks.len();
self.pending_tasks.retain(|task| !task.is_finished());
let pending_task_count = self.pending_tasks.len();
log::debug!(
"Waiting for {pending_task_count} pending tasks in Sorter before shutting \
down. {} tasks already finished.",
initial_task_count - pending_task_count
);
let results = join_all(mem::take(&mut self.pending_tasks)).await;
results.into_iter().try_for_each(|result| result.map_err(Error::from).flatten())?;
log::debug!(
"Sorter tasks finished, telling collector to shut down the actor system."
);
self.collector.send(SorterCollectorMessage::Finish)?;
},
}
Ok(())
}
}
struct Collector;
impl Actor for Collector {
type Context = Context<SorterCollectorMessage>;
type Error = Error;
type Message = SorterCollectorMessage;
fn handle(&mut self, ctx: &mut Self::Context, message: SorterCollectorMessage) -> Result<()> {
match message {
SorterCollectorMessage::Chunk(chunk) => {
let Chunk { host, text } = chunk;
println!("{host:>20}: {text:?}");
},
SorterCollectorMessage::Finish => {
log::debug!("Collector shutting down the actor system.");
ctx.system_handle.shutdown()?;
},
}
Ok(())
}
}
fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("debug")).init();
let mut system = System::new("async actors example");
let collector = system.spawn(Collector)?;
let sorter = system.spawn_async(Sorter::new(collector.recipient()))?;
let crawler = system.spawn_async(Crawler { sorter: sorter.recipient() })?;
let hosts = vec![
"google.com".to_string(),
"captive.apple.com".to_string(),
"httpforever.com".to_string(),
];
crawler.send(CrawlerMessage::Crawl { hosts })?;
crawler.send(CrawlerMessage::Finish)?;
system.run()?;
Ok(())
}