Crate streamweave_timer

Crate streamweave_timer 

Source
Expand description

ยงstreamweave-timer

Crates.io Documentation License: CC BY-SA 4.0

Timer and interval producer for StreamWeave
Generate events at regular intervals for time-based processing.

The streamweave-timer package provides timer and interval producers for StreamWeave. It enables generating events at regular intervals for time-based processing, scheduling, and periodic tasks.

ยงโœจ Key Features

  • IntervalProducer: Generate events at regular intervals
  • Time-Based Processing: Time-based event generation
  • Scheduling: Schedule periodic tasks
  • Timestamp Events: Emit timestamps at intervals
  • Flexible Intervals: Configurable interval durations

ยง๐Ÿ“ฆ Installation

Add this to your Cargo.toml:

[dependencies]
streamweave-timer = "0.3.0"

ยง๐Ÿš€ Quick Start

ยงInterval Producer

use streamweave_timer::IntervalProducer;
use streamweave_pipeline::PipelineBuilder;
use std::time::Duration;

let producer = IntervalProducer::new(Duration::from_secs(1));

let pipeline = PipelineBuilder::new()
    .producer(producer)
    .consumer(|timestamp: std::time::SystemTime| {
        println!("Tick at {:?}", timestamp);
    });

pipeline.run().await?;

ยงPeriodic Processing

use streamweave_timer::IntervalProducer;
use streamweave_pipeline::PipelineBuilder;
use std::time::Duration;

let producer = IntervalProducer::new(Duration::from_secs(5));

let pipeline = PipelineBuilder::new()
    .producer(producer)
    .transformer(|_timestamp: std::time::SystemTime| {
        // Perform periodic task
        "Periodic task executed".to_string()
    })
    .consumer(|msg: String| {
        println!("{}", msg);
    });

pipeline.run().await?;

ยง๐Ÿ“– API Overview

ยงIntervalProducer

Generates events at regular intervals:

pub struct IntervalProducer {
    pub interval: Duration,
    pub config: ProducerConfig<std::time::SystemTime>,
}

Key Methods:

  • new(interval) - Create producer with interval duration
  • with_error_strategy(strategy) - Set error handling strategy
  • with_name(name) - Set component name
  • produce() - Generate stream of timestamps

ยง๐Ÿ“š Usage Examples

ยงPeriodic Tasks

Execute periodic tasks:

use streamweave_timer::IntervalProducer;
use streamweave_pipeline::PipelineBuilder;
use std::time::Duration;

let producer = IntervalProducer::new(Duration::from_secs(60));

let pipeline = PipelineBuilder::new()
    .producer(producer)
    .consumer(|_timestamp: std::time::SystemTime| {
        // Execute periodic task every minute
        perform_periodic_task();
    });

pipeline.run().await?;

ยงTime-Based Processing

Process events at intervals:

use streamweave_timer::IntervalProducer;
use streamweave_pipeline::PipelineBuilder;
use std::time::Duration;

let producer = IntervalProducer::new(Duration::from_millis(100));

let pipeline = PipelineBuilder::new()
    .producer(producer)
    .transformer(|timestamp: std::time::SystemTime| {
        format!("Event at {:?}", timestamp)
    })
    .consumer(|msg: String| {
        println!("{}", msg);
    });

pipeline.run().await?;

ยงError Handling

Configure error handling:

use streamweave_timer::IntervalProducer;
use streamweave_error::ErrorStrategy;
use std::time::Duration;

let producer = IntervalProducer::new(Duration::from_secs(1))
    .with_error_strategy(ErrorStrategy::Skip);

ยง๐Ÿ—๏ธ Architecture

Timer processing flow:

Timer โ”€โ”€> IntervalProducer โ”€โ”€> Stream<SystemTime> โ”€โ”€> Transformer โ”€โ”€> Stream<T> โ”€โ”€> Consumer

Timer Flow:

  1. IntervalProducer generates timestamps at intervals
  2. Timestamps flow through transformers
  3. Consumer processes time-based events
  4. Events generated continuously at intervals

ยง๐Ÿ”ง Configuration

ยงProducer Configuration

  • Interval: Duration between events
  • Error Strategy: Error handling strategy
  • Name: Component name for logging

ยง๐Ÿ” Error Handling

Timer errors are handled through the error system:

use streamweave_error::ErrorStrategy;

let producer = IntervalProducer::new(Duration::from_secs(1))
    .with_error_strategy(ErrorStrategy::Skip);

ยงโšก Performance Considerations

  • Interval Accuracy: Intervals are approximate
  • Resource Usage: Continuous timers consume resources
  • Scheduling: Use appropriate intervals for tasks

ยง๐Ÿ“ Examples

For more examples, see:

ยง๐Ÿ”— Dependencies

streamweave-timer depends on:

  • streamweave - Core traits
  • streamweave-error - Error handling
  • streamweave-message (optional) - Message envelope support
  • tokio - Async runtime
  • futures - Stream utilities

ยง๐ŸŽฏ Use Cases

Timer integration is used for:

  1. Periodic Tasks: Execute tasks at regular intervals
  2. Scheduling: Schedule periodic operations
  3. Time-Based Processing: Process events based on time
  4. Monitoring: Monitor systems at intervals

ยง๐Ÿ“– Documentation

ยง๐Ÿ”— See Also

ยง๐Ÿค Contributing

Contributions are welcome! Please see the Contributing Guide for details.

ยง๐Ÿ“„ License

This project is licensed under the CC BY-SA 4.0 license.

Re-exportsยง

pub use producers::*;

Modulesยง

producers