xdl-database 0.1.0

Database connectivity module for XDL - supports PostgreSQL, MySQL, DuckDB, SQLite, ODBC, Redis, and more
Documentation
; Apache Kafka Streaming Example
; Demonstrates Kafka producer/consumer operations

PRO kafka_streaming_example
    ; Create a database object for Kafka
    objdb = OBJ_NEW('XDLdbDatabase')

    ; Connect to Kafka broker
    ; Format: kafka://broker1:9092,broker2:9092
    ; For local Kafka: kafka://localhost:9092
    conn_str = 'kafka://localhost:9092'

    PRINT, 'Connecting to Kafka...'
    objdb->Connect, CONNECTION=conn_str

    IF objdb->IsConnected() THEN BEGIN
        PRINT, 'Successfully connected to Kafka!'
    ENDIF ELSE BEGIN
        PRINT, 'Connection failed!'
        RETURN
    ENDELSE

    ; ==================================================================
    ; TOPIC MANAGEMENT
    ; ==================================================================

    PRINT, ''
    PRINT, '=== Topic Management ==='

    ; Create a new topic
    PRINT, 'Creating topic "xdl-events"...'
    create_result = objdb->ExecuteSQL('CREATE TOPIC xdl-events')
    PRINT, create_result->GetData()

    ; List all topics
    PRINT, ''
    PRINT, 'Listing all topics:'
    topics_result = objdb->ExecuteSQL('LIST TOPICS')
    topics_data = topics_result->GetData()
    PRINT, topics_data
    topics_result->Destroy()

    ; ==================================================================
    ; PRODUCER - Send Messages
    ; ==================================================================

    PRINT, ''
    PRINT, '=== Producing Messages ==='

    ; Send individual messages
    PRINT, 'Sending message 1...'
    result1 = objdb->ExecuteSQL('PRODUCE TO xdl-events: Hello from XDL!')
    PRINT, result1->GetData()
    result1->Destroy()

    PRINT, 'Sending message 2...'
    result2 = objdb->ExecuteSQL('PRODUCE TO xdl-events: Sensor reading: temperature=25.5C')
    PRINT, result2->GetData()
    result2->Destroy()

    PRINT, 'Sending message 3...'
    result3 = objdb->ExecuteSQL('PRODUCE TO xdl-events: Alert: System status OK')
    PRINT, result3->GetData()
    result3->Destroy()

    ; Send structured data (JSON format)
    PRINT, 'Sending JSON message...'
    json_msg = '{"timestamp": 1234567890, "sensor_id": "TEMP001", "value": 25.5, "unit": "celsius"}'
    result4 = objdb->ExecuteSQL('PRODUCE TO xdl-events: ' + json_msg)
    PRINT, result4->GetData()
    result4->Destroy()

    ; Send array data
    FOR i = 1, 5 DO BEGIN
        msg = 'Data point ' + STRTRIM(i,2) + ': value=' + STRTRIM(i*10.0,2)
        result = objdb->ExecuteSQL('PRODUCE TO xdl-events: ' + msg)
        result->Destroy()
    ENDFOR
    PRINT, 'Sent 5 data points'

    ; Wait a moment for messages to be available
    WAIT, 1

    ; ==================================================================
    ; CONSUMER - Read Messages
    ; ==================================================================

    PRINT, ''
    PRINT, '=== Consuming Messages ==='

    ; Consume messages from topic (default limit 10)
    PRINT, 'Reading first 5 messages from xdl-events:'
    consume_result = objdb->ExecuteSQL('CONSUME FROM xdl-events LIMIT 5')

    n_messages = consume_result->RowCount()
    PRINT, 'Received ' + STRTRIM(n_messages,2) + ' messages'
    PRINT, ''

    ; Display messages
    IF n_messages GT 0 THEN BEGIN
        col_names = consume_result->ColumnNames()
        PRINT, 'Columns:', col_names

        messages = consume_result->GetData()
        PRINT, ''
        PRINT, 'Messages:'
        PRINT, messages

        ; Get specific column (payload)
        payloads = consume_result->GetColumn('payload')
        PRINT, ''
        PRINT, 'Message Payloads:'
        FOR i = 0, N_ELEMENTS(payloads)-1 DO BEGIN
            PRINT, '  ' + STRTRIM(i+1,2) + ': ' + payloads[i]
        ENDFOR
    ENDIF

    consume_result->Destroy()

    ; ==================================================================
    ; STREAM PROCESSING EXAMPLE
    ; ==================================================================

    PRINT, ''
    PRINT, '=== Stream Processing Example ==='

    ; Create a sensor data topic
    objdb->ExecuteSQL, 'CREATE TOPIC sensor-data'

    ; Simulate sensor data streaming
    PRINT, 'Simulating sensor data stream...'
    FOR i = 1, 10 DO BEGIN
        temperature = 20.0 + RANDOMU(seed) * 10.0
        humidity = 40.0 + RANDOMU(seed) * 40.0
        timestamp = SYSTIME(/SECONDS)

        ; Format as JSON
        sensor_msg = '{"sensor":"DHT22","temp":' + STRTRIM(temperature,2) + $
                     ',"humid":' + STRTRIM(humidity,2) + $
                     ',"time":' + STRTRIM(timestamp,2) + '}'

        result = objdb->ExecuteSQL('PRODUCE TO sensor-data: ' + sensor_msg)
        result->Destroy()

        WAIT, 0.1  ; Small delay between messages
    ENDFOR
    PRINT, 'Sent 10 sensor readings'

    ; Read and process stream data
    WAIT, 1
    PRINT, ''
    PRINT, 'Processing sensor data stream...'
    sensor_result = objdb->ExecuteSQL('CONSUME FROM sensor-data LIMIT 10')

    IF sensor_result->RowCount() GT 0 THEN BEGIN
        payloads = sensor_result->GetColumn('payload')

        ; Parse and analyze (simplified - in real use, parse JSON)
        PRINT, 'Received ' + STRTRIM(N_ELEMENTS(payloads),2) + ' sensor readings'
        PRINT, 'Sample readings:'
        FOR i = 0, MIN([2, N_ELEMENTS(payloads)-1]) DO BEGIN
            PRINT, '  ' + payloads[i]
        ENDFOR
    ENDIF

    sensor_result->Destroy()

    ; ==================================================================
    ; CLEANUP
    ; ==================================================================

    PRINT, ''
    PRINT, '=== Cleanup ==='

    ; Delete test topics (optional)
    ; Note: Topic deletion may require broker configuration
    ; delete_result = objdb->ExecuteSQL('DELETE TOPIC xdl-events')
    ; PRINT, delete_result->GetData()
    ; delete_result->Destroy()

    ; Disconnect
    objdb->Disconnect()
    OBJ_DESTROY, objdb

    PRINT, ''
    PRINT, 'Kafka streaming example completed!'
ENDPRO

; Run the example
kafka_streaming_example