use super::flow::Flow;
use super::materializer::StreamMaterializer;
use super::sink::Sink;
use super::stream_message::{NotUsed, StreamMessage};
use crate::actors::actor::Actor;
use crate::actors::actor_system::ActorSystem;
use crate::actors::messages::Message;
use log::info;
use std::time::Duration;
#[cfg(feature = "streaming")]
use reqwest;
#[cfg(feature = "streaming")]
#[derive(Debug)]
pub struct Source<SourceType, Materializer> {
mat: Materializer,
data: Option<SourceType>,
}
impl Default for Source<(), NotUsed> {
fn default() -> Self {
Source {
mat: NotUsed,
data: None,
}
}
}
#[derive(Debug)]
pub enum SourceError {
InvalidUrl(String),
NetworkError(String),
InvalidResponse(String),
Timeout,
EmptyResponse,
TooLarge(usize),
FileNotFound(String),
FileReadError(String),
PermissionDenied(String),
InvalidPath(String),
}
impl<T, Materializer> Source<T, Materializer> {
pub fn new(data: T, mat: Materializer) -> Self {
Source {
mat,
data: Some(data),
}
}
pub fn to_materializer(&self) -> &Materializer {
&self.mat
}
pub fn data(&self) -> Option<&T> {
self.data.as_ref()
}
pub fn data_len(&self) -> Option<usize>
where
T: AsRef<[u8]>,
{
self.data.as_ref().map(|d| d.as_ref().len())
}
}
impl Source<String, NotUsed> {
pub fn from_items(items: Vec<String>) -> Self {
let data = items.join("\n");
Source {
mat: NotUsed,
data: Some(data),
}
}
pub fn map<F>(mut self, f: F) -> Self
where
F: FnOnce(String) -> String,
{
if let Some(data) = self.data.take() {
self.data = Some(f(data));
}
self
}
pub fn filter<F>(mut self, f: F) -> Self
where
F: FnOnce(&String) -> bool,
{
if let Some(data) = &self.data {
if !f(data) {
self.data = None;
}
}
self
}
pub fn map_lines<F>(mut self, f: F) -> Self
where
F: Fn(&str) -> String,
{
if let Some(data) = self.data.take() {
let mapped_lines: Vec<String> = data.lines().map(f).collect();
self.data = Some(mapped_lines.join("\n"));
}
self
}
pub fn filter_lines<F>(mut self, f: F) -> Self
where
F: Fn(&str) -> bool,
{
if let Some(data) = self.data.take() {
let filtered_lines: Vec<&str> = data.lines().filter(|line| f(line)).collect();
self.data = Some(filtered_lines.join("\n"));
}
self
}
}
impl Source<Vec<u8>, NotUsed> {
pub fn map<F>(mut self, f: F) -> Self
where
F: FnOnce(Vec<u8>) -> Vec<u8>,
{
if let Some(data) = self.data.take() {
self.data = Some(f(data));
}
self
}
pub fn filter<F>(mut self, f: F) -> Self
where
F: FnOnce(&Vec<u8>) -> bool,
{
if let Some(data) = &self.data {
if !f(data) {
self.data = None;
}
}
self
}
}
#[cfg(feature = "streaming")]
impl Source<Vec<u8>, NotUsed> {
pub async fn from_url(url: &str) -> Result<Self, SourceError> {
let parsed_url = reqwest::Url::parse(url)
.map_err(|e| SourceError::InvalidUrl(format!("Invalid URL format: {}", e)))?;
if parsed_url.scheme() != "http" && parsed_url.scheme() != "https" {
return Err(SourceError::InvalidUrl(format!(
"Only HTTP(S) URLs are supported, got: {}",
parsed_url.scheme()
)));
}
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.map_err(|e| SourceError::NetworkError(format!("Failed to build client: {}", e)))?;
let response = client.get(url).send().await.map_err(|e| {
if e.is_timeout() {
SourceError::Timeout
} else {
SourceError::NetworkError(format!("Request failed: {}", e))
}
})?;
if !response.status().is_success() {
return Err(SourceError::InvalidResponse(format!(
"HTTP error: {} - {}",
response.status(),
response.status().canonical_reason().unwrap_or("Unknown")
)));
}
const MAX_SIZE: usize = 10 * 1024 * 1024; if let Some(content_length) = response.content_length() {
if content_length > MAX_SIZE as u64 {
return Err(SourceError::TooLarge(content_length as usize));
}
}
let bytes = response.bytes().await.map_err(|e| {
SourceError::NetworkError(format!("Failed to read response body: {}", e))
})?;
if bytes.len() > MAX_SIZE {
return Err(SourceError::TooLarge(bytes.len()));
}
if bytes.is_empty() {
return Err(SourceError::EmptyResponse);
}
Ok(Source {
mat: NotUsed,
data: Some(bytes.to_vec()),
})
}
}
#[cfg(feature = "streaming")]
impl Source<String, NotUsed> {
pub async fn from_url_text(url: &str) -> Result<Self, SourceError> {
let bytes_source = Source::<Vec<u8>, NotUsed>::from_url(url).await?;
let text = String::from_utf8(bytes_source.data.unwrap_or_default())
.map_err(|e| SourceError::InvalidResponse(format!("Invalid UTF-8: {}", e)))?;
Ok(Source {
mat: NotUsed,
data: Some(text),
})
}
pub fn from_file(path: &str) -> Result<Self, SourceError> {
use std::fs;
use std::path::Path;
let file_path = Path::new(path);
if path.is_empty() {
return Err(SourceError::InvalidPath("Empty path provided".to_string()));
}
if !file_path.exists() {
return Err(SourceError::FileNotFound(format!(
"File not found: {}",
path
)));
}
if !file_path.is_file() {
return Err(SourceError::InvalidPath(format!(
"Path is not a file: {}",
path
)));
}
let metadata = fs::metadata(file_path).map_err(|e| {
SourceError::FileReadError(format!("Failed to read file metadata: {}", e))
})?;
const MAX_SIZE: u64 = 10 * 1024 * 1024; if metadata.len() > MAX_SIZE {
return Err(SourceError::TooLarge(metadata.len() as usize));
}
if metadata.len() == 0 {
return Err(SourceError::EmptyResponse);
}
let contents = fs::read_to_string(file_path).map_err(|e| {
if e.kind() == std::io::ErrorKind::PermissionDenied {
SourceError::PermissionDenied(format!("Permission denied: {}", path))
} else if e.kind() == std::io::ErrorKind::InvalidData {
SourceError::InvalidResponse(format!("File contains invalid UTF-8: {}", path))
} else {
SourceError::FileReadError(format!("Failed to read file: {}", e))
}
})?;
Ok(Source {
mat: NotUsed,
data: Some(contents),
})
}
}
#[cfg(feature = "streaming")]
impl Source<Vec<u8>, NotUsed> {
pub fn from_file_bytes(path: &str) -> Result<Self, SourceError> {
use std::fs;
use std::path::Path;
let file_path = Path::new(path);
if path.is_empty() {
return Err(SourceError::InvalidPath("Empty path provided".to_string()));
}
if !file_path.exists() {
return Err(SourceError::FileNotFound(format!(
"File not found: {}",
path
)));
}
if !file_path.is_file() {
return Err(SourceError::InvalidPath(format!(
"Path is not a file: {}",
path
)));
}
let metadata = fs::metadata(file_path).map_err(|e| {
SourceError::FileReadError(format!("Failed to read file metadata: {}", e))
})?;
const MAX_SIZE: u64 = 10 * 1024 * 1024; if metadata.len() > MAX_SIZE {
return Err(SourceError::TooLarge(metadata.len() as usize));
}
if metadata.len() == 0 {
return Err(SourceError::EmptyResponse);
}
let contents = fs::read(file_path).map_err(|e| {
if e.kind() == std::io::ErrorKind::PermissionDenied {
SourceError::PermissionDenied(format!("Permission denied: {}", path))
} else {
SourceError::FileReadError(format!("Failed to read file: {}", e))
}
})?;
Ok(Source {
mat: NotUsed,
data: Some(contents),
})
}
}
pub struct SourceActor {
name: String,
data: Vec<StreamMessage>,
_current_index: usize,
downstream: Option<tokio::sync::mpsc::Sender<Message<StreamMessage, StreamMessage>>>,
}
impl SourceActor {
pub fn new(name: String, data: Vec<StreamMessage>) -> Self {
SourceActor {
name,
data,
_current_index: 0,
downstream: None,
}
}
pub fn set_downstream(
&mut self,
sender: tokio::sync::mpsc::Sender<Message<StreamMessage, StreamMessage>>,
) {
self.downstream = Some(sender);
}
pub async fn emit_all(&mut self) {
if let Some(downstream) = &self.downstream {
info!(
"SourceActor '{}' emitting {} messages",
self.name,
self.data.len()
);
for msg in &self.data {
let _ = downstream
.send(Message {
payload: Some(msg.clone()),
stop: false,
responder: None,
blocking: None,
})
.await;
}
let _ = downstream
.send(Message {
payload: Some(StreamMessage::Complete),
stop: false,
responder: None,
blocking: None,
})
.await;
info!("SourceActor '{}' completed emission", self.name);
}
}
}
impl Actor<StreamMessage, StreamMessage> for SourceActor {
async fn receive(&mut self, message: Message<StreamMessage, StreamMessage>) {
if let Some(payload) = message.payload {
match payload {
StreamMessage::Text(ref cmd) if cmd == "start" => {
info!("SourceActor '{}' received start command", self.name);
self.emit_all().await;
}
_ => {
info!("SourceActor '{}' received unexpected message", self.name);
}
}
}
}
}
#[cfg(feature = "streaming")]
impl Source<Vec<u8>, NotUsed> {
pub async fn to_sink<F>(
self,
actor_system: &mut ActorSystem<StreamMessage, StreamMessage>,
sink: Sink<F>,
) -> StreamMaterializer
where
F: Fn(StreamMessage) + Send + 'static,
{
let mut materializer = StreamMaterializer::new();
let data = if let Some(bytes) = self.data {
vec![StreamMessage::Data(bytes)]
} else {
vec![]
};
let mut source_actor = SourceActor::new("ByteSource".to_string(), data);
info!("Spawning sink actor");
let sink_id = actor_system.get_actor_count() as u32;
actor_system
.spawn_actor(sink, Some("Sink".to_string()))
.await;
let sink_ref = actor_system
.get_actor_ref(sink_id)
.expect("Sink actor should exist after spawning");
source_actor.set_downstream(sink_ref.sender.clone());
info!("Spawning source actor");
let source_id = actor_system.get_actor_count() as u32;
actor_system
.spawn_actor(source_actor, Some("Source".to_string()))
.await;
let source_ref = actor_system
.get_actor_ref(source_id)
.expect("Source actor should exist after spawning");
materializer.set_source(source_ref.clone());
materializer.set_sink(sink_ref);
source_ref
.send(Message {
payload: Some(StreamMessage::Text("start".to_string())),
stop: false,
responder: None,
blocking: None,
})
.await;
materializer
}
pub async fn via_to_sink<TransformF, SinkF>(
self,
actor_system: &mut ActorSystem<StreamMessage, StreamMessage>,
flow: Flow<TransformF>,
sink: Sink<SinkF>,
) -> StreamMaterializer
where
TransformF: Fn(StreamMessage) -> StreamMessage + Send + 'static,
SinkF: Fn(StreamMessage) + Send + 'static,
{
let mut materializer = StreamMaterializer::new();
let data = if let Some(bytes) = self.data {
vec![StreamMessage::Data(bytes)]
} else {
vec![]
};
let mut source_actor = SourceActor::new("ByteSource".to_string(), data);
info!("Spawning sink actor");
let sink_id = actor_system.get_actor_count() as u32;
actor_system
.spawn_actor(sink, Some("Sink".to_string()))
.await;
let sink_ref = actor_system
.get_actor_ref(sink_id)
.expect("Sink actor should exist after spawning");
info!("Spawning flow actor");
let mut flow_actor = flow;
flow_actor.set_downstream(sink_ref.sender.clone());
let flow_id = actor_system.get_actor_count() as u32;
actor_system
.spawn_actor(flow_actor, Some("Flow".to_string()))
.await;
let flow_ref = actor_system
.get_actor_ref(flow_id)
.expect("Flow actor should exist after spawning");
source_actor.set_downstream(flow_ref.sender.clone());
info!("Spawning source actor");
let source_id = actor_system.get_actor_count() as u32;
actor_system
.spawn_actor(source_actor, Some("Source".to_string()))
.await;
let source_ref = actor_system
.get_actor_ref(source_id)
.expect("Source actor should exist after spawning");
materializer.set_source(source_ref.clone());
materializer.add_flow(flow_ref);
materializer.set_sink(sink_ref);
#[cfg(feature = "visualize")]
{
actor_system.record_message_sent(source_id, flow_id);
actor_system.record_message_sent(flow_id, sink_id);
}
source_ref
.send(Message {
payload: Some(StreamMessage::Text("start".to_string())),
stop: false,
responder: None,
blocking: None,
})
.await;
materializer
}
}
#[cfg(feature = "streaming")]
impl Source<String, NotUsed> {
pub async fn to_sink<F>(
self,
actor_system: &mut ActorSystem<StreamMessage, StreamMessage>,
sink: Sink<F>,
) -> StreamMaterializer
where
F: Fn(StreamMessage) + Send + 'static,
{
let mut materializer = StreamMaterializer::new();
let data = if let Some(text) = self.data {
vec![StreamMessage::Text(text)]
} else {
vec![]
};
let mut source_actor = SourceActor::new("TextSource".to_string(), data);
info!("Spawning sink actor");
let sink_id = actor_system.get_actor_count() as u32;
actor_system
.spawn_actor(sink, Some("Sink".to_string()))
.await;
let sink_ref = actor_system
.get_actor_ref(sink_id)
.expect("Sink actor should exist after spawning");
source_actor.set_downstream(sink_ref.sender.clone());
info!("Spawning source actor");
let source_id = actor_system.get_actor_count() as u32;
actor_system
.spawn_actor(source_actor, Some("Source".to_string()))
.await;
let source_ref = actor_system
.get_actor_ref(source_id)
.expect("Source actor should exist after spawning");
materializer.set_source(source_ref.clone());
materializer.set_sink(sink_ref);
source_ref
.send(Message {
payload: Some(StreamMessage::Text("start".to_string())),
stop: false,
responder: None,
blocking: None,
})
.await;
materializer
}
pub async fn via_to_sink<TransformF, SinkF>(
self,
actor_system: &mut ActorSystem<StreamMessage, StreamMessage>,
flow: Flow<TransformF>,
sink: Sink<SinkF>,
) -> StreamMaterializer
where
TransformF: Fn(StreamMessage) -> StreamMessage + Send + 'static,
SinkF: Fn(StreamMessage) + Send + 'static,
{
let mut materializer = StreamMaterializer::new();
let data = if let Some(text) = self.data {
vec![StreamMessage::Text(text)]
} else {
vec![]
};
let mut source_actor = SourceActor::new("TextSource".to_string(), data);
info!("Spawning sink actor");
let sink_id = actor_system.get_actor_count() as u32;
actor_system
.spawn_actor(sink, Some("Sink".to_string()))
.await;
let sink_ref = actor_system
.get_actor_ref(sink_id)
.expect("Sink actor should exist after spawning");
info!("Spawning flow actor");
let mut flow_actor = flow;
flow_actor.set_downstream(sink_ref.sender.clone());
let flow_id = actor_system.get_actor_count() as u32;
actor_system
.spawn_actor(flow_actor, Some("Flow".to_string()))
.await;
let flow_ref = actor_system
.get_actor_ref(flow_id)
.expect("Flow actor should exist after spawning");
source_actor.set_downstream(flow_ref.sender.clone());
info!("Spawning source actor");
let source_id = actor_system.get_actor_count() as u32;
actor_system
.spawn_actor(source_actor, Some("Source".to_string()))
.await;
let source_ref = actor_system
.get_actor_ref(source_id)
.expect("Source actor should exist after spawning");
materializer.set_source(source_ref.clone());
materializer.add_flow(flow_ref);
materializer.set_sink(sink_ref);
#[cfg(feature = "visualize")]
{
actor_system.record_message_sent(source_id, flow_id);
actor_system.record_message_sent(flow_id, sink_id);
}
source_ref
.send(Message {
payload: Some(StreamMessage::Text("start".to_string())),
stop: false,
responder: None,
blocking: None,
})
.await;
materializer
}
}