Expand description
ยงstreamweave-timer
Timer and interval producer for StreamWeave
Generate events at regular intervals for time-based processing.
The streamweave-timer package provides timer and interval producers for StreamWeave. It enables generating events at regular intervals for time-based processing, scheduling, and periodic tasks.
ยงโจ Key Features
- IntervalProducer: Generate events at regular intervals
- Time-Based Processing: Time-based event generation
- Scheduling: Schedule periodic tasks
- Timestamp Events: Emit timestamps at intervals
- Flexible Intervals: Configurable interval durations
ยง๐ฆ Installation
Add this to your Cargo.toml:
[dependencies]
streamweave-timer = "0.3.0"ยง๐ Quick Start
ยงInterval Producer
use streamweave_timer::IntervalProducer;
use streamweave_pipeline::PipelineBuilder;
use std::time::Duration;
let producer = IntervalProducer::new(Duration::from_secs(1));
let pipeline = PipelineBuilder::new()
.producer(producer)
.consumer(|timestamp: std::time::SystemTime| {
println!("Tick at {:?}", timestamp);
});
pipeline.run().await?;ยงPeriodic Processing
use streamweave_timer::IntervalProducer;
use streamweave_pipeline::PipelineBuilder;
use std::time::Duration;
let producer = IntervalProducer::new(Duration::from_secs(5));
let pipeline = PipelineBuilder::new()
.producer(producer)
.transformer(|_timestamp: std::time::SystemTime| {
// Perform periodic task
"Periodic task executed".to_string()
})
.consumer(|msg: String| {
println!("{}", msg);
});
pipeline.run().await?;ยง๐ API Overview
ยงIntervalProducer
Generates events at regular intervals:
pub struct IntervalProducer {
pub interval: Duration,
pub config: ProducerConfig<std::time::SystemTime>,
}Key Methods:
new(interval)- Create producer with interval durationwith_error_strategy(strategy)- Set error handling strategywith_name(name)- Set component nameproduce()- Generate stream of timestamps
ยง๐ Usage Examples
ยงPeriodic Tasks
Execute periodic tasks:
use streamweave_timer::IntervalProducer;
use streamweave_pipeline::PipelineBuilder;
use std::time::Duration;
let producer = IntervalProducer::new(Duration::from_secs(60));
let pipeline = PipelineBuilder::new()
.producer(producer)
.consumer(|_timestamp: std::time::SystemTime| {
// Execute periodic task every minute
perform_periodic_task();
});
pipeline.run().await?;ยงTime-Based Processing
Process events at intervals:
use streamweave_timer::IntervalProducer;
use streamweave_pipeline::PipelineBuilder;
use std::time::Duration;
let producer = IntervalProducer::new(Duration::from_millis(100));
let pipeline = PipelineBuilder::new()
.producer(producer)
.transformer(|timestamp: std::time::SystemTime| {
format!("Event at {:?}", timestamp)
})
.consumer(|msg: String| {
println!("{}", msg);
});
pipeline.run().await?;ยงError Handling
Configure error handling:
use streamweave_timer::IntervalProducer;
use streamweave_error::ErrorStrategy;
use std::time::Duration;
let producer = IntervalProducer::new(Duration::from_secs(1))
.with_error_strategy(ErrorStrategy::Skip);ยง๐๏ธ Architecture
Timer processing flow:
Timer โโ> IntervalProducer โโ> Stream<SystemTime> โโ> Transformer โโ> Stream<T> โโ> ConsumerTimer Flow:
- IntervalProducer generates timestamps at intervals
- Timestamps flow through transformers
- Consumer processes time-based events
- Events generated continuously at intervals
ยง๐ง Configuration
ยงProducer Configuration
- Interval: Duration between events
- Error Strategy: Error handling strategy
- Name: Component name for logging
ยง๐ Error Handling
Timer errors are handled through the error system:
use streamweave_error::ErrorStrategy;
let producer = IntervalProducer::new(Duration::from_secs(1))
.with_error_strategy(ErrorStrategy::Skip);ยงโก Performance Considerations
- Interval Accuracy: Intervals are approximate
- Resource Usage: Continuous timers consume resources
- Scheduling: Use appropriate intervals for tasks
ยง๐ Examples
For more examples, see:
ยง๐ Dependencies
streamweave-timer depends on:
streamweave- Core traitsstreamweave-error- Error handlingstreamweave-message(optional) - Message envelope supporttokio- Async runtimefutures- Stream utilities
ยง๐ฏ Use Cases
Timer integration is used for:
- Periodic Tasks: Execute tasks at regular intervals
- Scheduling: Schedule periodic operations
- Time-Based Processing: Process events based on time
- Monitoring: Monitor systems at intervals
ยง๐ Documentation
ยง๐ See Also
- streamweave - Core traits
- streamweave-error - Error handling
ยง๐ค Contributing
Contributions are welcome! Please see the Contributing Guide for details.
ยง๐ License
This project is licensed under the CC BY-SA 4.0 license.
Re-exportsยง
pub use producers::*;