briefcase-python 2.4.1

Python bindings for Briefcase AI
Documentation
"""
KafkaPublisher — publish BriefcaseEvents to a Kafka topic.

Requires confluent-kafka (optional dependency).
"""

from __future__ import annotations

import json
from typing import List

from briefcase.events.types import BriefcaseEvent


class KafkaPublisher:
    """Publish BriefcaseEvents as JSON messages to a Kafka topic.

    Args:
        brokers: List of Kafka broker addresses (e.g. ["kafka:9092"]).
        topic: Target Kafka topic name.
    """

    def __init__(self, brokers: List[str], topic: str) -> None:
        self._brokers = brokers
        self._topic = topic
        self._producer = None  # lazily initialised on first publish

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    def _get_producer(self):
        """Return (or lazily create) the confluent-kafka Producer."""
        if self._producer is None:
            try:
                from confluent_kafka import Producer
            except ImportError as exc:
                raise ImportError(
                    "confluent-kafka is required for KafkaPublisher. "
                    "Install it with: pip install 'briefcase-ai[kafka]'"
                ) from exc
            self._producer = Producer(
                {"bootstrap.servers": ",".join(self._brokers)}
            )
        return self._producer

    @staticmethod
    def _serialize(event: BriefcaseEvent) -> bytes:
        return json.dumps(
            {
                "event_type": event.event_type,
                "decision_id": event.decision_id,
                "timestamp": event.timestamp.isoformat(),
                "idempotency_key": event.idempotency_key,
                "payload": event.payload,
            },
            default=str,
        ).encode()

    # ------------------------------------------------------------------
    # Public interface
    # ------------------------------------------------------------------

    def publish(self, event: BriefcaseEvent) -> bool:
        """Produce *event* to the configured Kafka topic.

        Returns True on success. Silently handles all produce errors.
        """
        try:
            producer = self._get_producer()
            value = self._serialize(event)
            producer.produce(
                topic=self._topic,
                key=event.idempotency_key.encode(),
                value=value,
            )
            producer.poll(0)  # non-blocking flush of internal queue
            return True
        except ImportError:
            raise
        except Exception:
            return False